diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Duration.cpp | 45 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Message.cpp | 4 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 36 | ||||
-rw-r--r-- | cpp/src/tests/qpid_recv.cpp | 6 | ||||
-rw-r--r-- | cpp/src/tests/qpid_send.cpp | 2 |
11 files changed, 79 insertions, 29 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 40590993eb..1695c6fa17 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -679,6 +679,7 @@ set (qpidclient_SOURCES qpid/messaging/AddressParser.cpp qpid/messaging/Connection.cpp qpid/messaging/ConnectionImpl.h + qpid/messaging/Duration.cpp qpid/messaging/ListContent.cpp qpid/messaging/ListView.cpp qpid/messaging/MapContent.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 7da7f42dcb..d6867701b4 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -707,6 +707,7 @@ libqpidclient_la_SOURCES = \ qpid/messaging/AddressParser.h \ qpid/messaging/AddressParser.cpp \ qpid/messaging/Connection.cpp \ + qpid/messaging/Duration.cpp \ qpid/messaging/ListContent.cpp \ qpid/messaging/ListView.cpp \ qpid/messaging/MapContent.cpp \ diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 8e501511e4..3f5cccfedb 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -26,6 +26,7 @@ #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/log/Statement.h" #include "qpid/messaging/Address.h" +#include "qpid/messaging/Duration.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/types/Variant.h" @@ -276,7 +277,7 @@ void populateHeaders(qpid::messaging::Message& message, const MessageProperties* messageProperties) { if (deliveryProperties) { - message.setTtl(deliveryProperties->getTtl()); + message.setTtl(qpid::messaging::Duration(deliveryProperties->getTtl())); message.setDurable(deliveryProperties->getDeliveryMode() == DELIVERY_MODE_PERSISTENT); MessageImplAccess::get(message).redelivered = deliveryProperties->getRedelivered(); } diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index e75368cda7..d0d945b934 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -47,7 +47,7 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); } translate(from.getHeaders(), message.getMessageProperties().getApplicationHeaders()); - message.getDeliveryProperties().setTtl(from.getTtl()); + message.getDeliveryProperties().setTtl(from.getTtl().getMilliseconds()); if (from.getDurable()) { message.getDeliveryProperties().setDeliveryMode(DELIVERY_MODE_PERSISTENT); } diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 2f52efbceb..c3367f8ab4 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -30,6 +30,7 @@ namespace client { namespace amqp0_10 { using qpid::messaging::Receiver; +using qpid::messaging::Duration; void ReceiverImpl::received(qpid::messaging::Message&) { @@ -163,7 +164,7 @@ bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging: } else { sync(session).messageFlush(destination); startFlow();//reallocate credit - return getImpl(message, 0); + return getImpl(message, Duration::IMMEDIATE); } } diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 245ec878be..209ab93909 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -267,8 +267,9 @@ bool SessionImpl::accept(ReceiverImpl* receiver, qpid::sys::Duration adjust(qpid::messaging::Duration timeout) { - if (timeout < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { - return timeout * qpid::sys::TIME_MSEC; + uint64_t ms = timeout.getMilliseconds(); + if (ms < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { + return ms * qpid::sys::TIME_MSEC; } else { return qpid::sys::TIME_INFINITE; } diff --git a/cpp/src/qpid/messaging/Duration.cpp b/cpp/src/qpid/messaging/Duration.cpp new file mode 100644 index 0000000000..c415b70fbe --- /dev/null +++ b/cpp/src/qpid/messaging/Duration.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/messaging/Duration.h" +#include <limits> + +namespace qpid { +namespace messaging { + +Duration::Duration(uint64_t ms) : milliseconds(ms) {} +uint64_t Duration::getMilliseconds() const { return milliseconds; } + +Duration operator*(const Duration& duration, uint64_t multiplier) +{ + return Duration(duration.getMilliseconds() * multiplier); +} + +Duration operator*(uint64_t multiplier, const Duration& duration) +{ + return Duration(duration.getMilliseconds() * multiplier); +} + +const Duration Duration::INFINITE(std::numeric_limits<uint64_t>::max()); +const Duration Duration::IMMEDIATE(0); +const Duration Duration::SECOND(1000); +const Duration Duration::MINUTE(SECOND * 60); + +}} // namespace qpid::messaging diff --git a/cpp/src/qpid/messaging/Message.cpp b/cpp/src/qpid/messaging/Message.cpp index d953b3ff75..822659f6ef 100644 --- a/cpp/src/qpid/messaging/Message.cpp +++ b/cpp/src/qpid/messaging/Message.cpp @@ -52,8 +52,8 @@ const std::string& Message::getUserId() const { return impl->userId; } void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; } const std::string& Message::getCorrelationId() const { return impl->correlationId; } -void Message::setTtl(Duration ttl) { impl->ttl = ttl; } -Duration Message::getTtl() const { return impl->ttl; } +void Message::setTtl(Duration ttl) { impl->ttl = ttl.getMilliseconds(); } +Duration Message::getTtl() const { return Duration(impl->ttl); } void Message::setDurable(bool durable) { impl->durable = durable; } bool Message::getDurable() const { return impl->durable; } diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 56e86d101f..c4659493ce 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -131,7 +131,7 @@ struct MessagingFixture : public BrokerFixture Message out(Uuid(true).str()); s.send(out); Message in; - BOOST_CHECK(r.fetch(in, 5*DURATION_SEC)); + BOOST_CHECK(r.fetch(in, 5*Duration::SECOND)); BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); r.close(); s.close(); @@ -197,7 +197,7 @@ struct MultiQueueFixture : MessagingFixture } }; -std::vector<std::string> fetch(Receiver& receiver, int count, Duration timeout=DURATION_SEC*5) +std::vector<std::string> fetch(Receiver& receiver, int count, Duration timeout=Duration::SECOND*5) { std::vector<std::string> data; Message message; @@ -216,7 +216,7 @@ void send(Sender& sender, uint count = 1, uint start = 1, const std::string& bas } void receive(Receiver& receiver, uint count = 1, uint start = 1, - const std::string& base = "Message", Duration timeout=DURATION_SEC*5) + const std::string& base = "Message", Duration timeout=Duration::SECOND*5) { for (uint i = start; i < start + count; ++i) { BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str()); @@ -230,7 +230,7 @@ QPID_AUTO_TEST_CASE(testSimpleSendReceive) Message out("test-message"); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * DURATION_SEC); + Message in = receiver.fetch(Duration::SECOND * 5); fix.session.acknowledge(); BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); } @@ -247,7 +247,7 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders) Receiver receiver = fix.session.createReceiver(fix.queue); Message in; for (uint i = 0; i < 10; ++i) { - BOOST_CHECK(receiver.fetch(in, 5 * DURATION_SEC)); + BOOST_CHECK(receiver.fetch(in, Duration::SECOND * 5)); BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i); fix.session.acknowledge(); @@ -301,7 +301,7 @@ QPID_AUTO_TEST_CASE(testSimpleTopic) BOOST_CHECK_EQUAL(fetch(sub1, 4), boost::assign::list_of<std::string>("two")("three")("four")("five")); BOOST_CHECK_EQUAL(fetch(sub3, 2), boost::assign::list_of<std::string>("four")("five")); Message in; - BOOST_CHECK(!sub2.fetch(in, 0));//TODO: or should this raise an error? + BOOST_CHECK(!sub2.fetch(in, Duration::IMMEDIATE));//TODO: or should this raise an error? //TODO: check pending messages... @@ -324,7 +324,7 @@ QPID_AUTO_TEST_CASE(testNextReceiver) for (uint i = 0; i < fix.queues.size(); i++) { Message msg; - BOOST_CHECK(fix.session.nextReceiver().fetch(msg, DURATION_SEC)); + BOOST_CHECK(fix.session.nextReceiver().fetch(msg, Duration::SECOND)); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); } } @@ -346,7 +346,7 @@ QPID_AUTO_TEST_CASE(testMapMessage) content.encode(); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * DURATION_SEC); + Message in = receiver.fetch(5 * Duration::SECOND); MapView view(in); BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); @@ -369,7 +369,7 @@ QPID_AUTO_TEST_CASE(testMapMessageWithInitial) content.encode(); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * DURATION_SEC); + Message in = receiver.fetch(5 * Duration::SECOND); MapView view(in); BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); @@ -389,7 +389,7 @@ QPID_AUTO_TEST_CASE(testListMessage) content.encode(); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * DURATION_SEC); + Message in = receiver.fetch(5 * Duration::SECOND); ListView view(in); BOOST_CHECK_EQUAL(view.size(), content.size()); BOOST_CHECK_EQUAL(view.front().asString(), "abc"); @@ -423,7 +423,7 @@ QPID_AUTO_TEST_CASE(testListMessageWithInitial) content.encode(); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * DURATION_SEC); + Message in = receiver.fetch(5 * Duration::SECOND); ListView view(in); BOOST_CHECK_EQUAL(view.size(), content.size()); BOOST_CHECK_EQUAL(view.front().asString(), "abc"); @@ -452,10 +452,10 @@ QPID_AUTO_TEST_CASE(testReject) Message m2("accept-me"); sender.send(m2); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * DURATION_SEC); + Message in = receiver.fetch(5 * Duration::SECOND); BOOST_CHECK_EQUAL(in.getContent(), m1.getContent()); fix.session.reject(in); - in = receiver.fetch(5 * DURATION_SEC); + in = receiver.fetch(5 * Duration::SECOND); BOOST_CHECK_EQUAL(in.getContent(), m2.getContent()); fix.session.acknowledge(); } @@ -834,17 +834,17 @@ QPID_AUTO_TEST_CASE(testTx) send(sender2, 5, 1, "B"); ssn2.commit(); receive(receiver1, 5, 1, "B");//(only those from sender2 should be received) - BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages + BOOST_CHECK(!receiver1.fetch(in, Duration::IMMEDIATE));//check there are no more messages ssn1.rollback(); receive(receiver2, 5, 1, "B"); - BOOST_CHECK(!receiver2.fetch(in, 0));//check there are no more messages + BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));//check there are no more messages ssn2.rollback(); receive(receiver1, 5, 1, "B"); - BOOST_CHECK(!receiver1.fetch(in, 0));//check there are no more messages + BOOST_CHECK(!receiver1.fetch(in, Duration::IMMEDIATE));//check there are no more messages ssn1.commit(); //check neither receiver gets any more messages: - BOOST_CHECK(!receiver1.fetch(in, 0)); - BOOST_CHECK(!receiver2.fetch(in, 0)); + BOOST_CHECK(!receiver1.fetch(in, Duration::IMMEDIATE)); + BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); } QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index 3ee8208a0b..994ef00960 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -96,8 +96,8 @@ struct Options : public qpid::Options Duration getTimeout() { - if (forever) return INFINITE_DURATION; - else return timeout*DURATION_SEC; + if (forever) return Duration::INFINITE; + else return Duration::SECOND*timeout; } bool parse(int argc, char** argv) @@ -173,7 +173,7 @@ int main(int argc, char ** argv) if (msg.getReplyTo()) std::cout << "ReplyTo: " << msg.getReplyTo() << std::endl; if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl; if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl; - if (msg.getTtl()) std::cout << "TTL: " << msg.getTtl() << std::endl; + if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl; if (msg.getDurable()) std::cout << "Durable: true" << std::endl; if (msg.isRedelivered()) std::cout << "Redelivered: true" << std::endl; std::cout << "Headers: " << msg.getHeaders() << std::endl; diff --git a/cpp/src/tests/qpid_send.cpp b/cpp/src/tests/qpid_send.cpp index eabcbdae21..a75d21e0a5 100644 --- a/cpp/src/tests/qpid_send.cpp +++ b/cpp/src/tests/qpid_send.cpp @@ -192,7 +192,7 @@ int main(int argc, char ** argv) Message msg; msg.setDurable(opts.durable); if (opts.ttl) { - msg.setTtl(opts.ttl); + msg.setTtl(Duration(opts.ttl)); } if (!opts.replyto.empty()) msg.setReplyTo(Address(opts.replyto)); if (!opts.userid.empty()) msg.setUserId(opts.userid); |