diff options
-rw-r--r-- | cpp/examples/messaging/drain.cpp | 8 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Duration.h | 39 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Message.h | 11 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Receiver.h | 10 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Session.h | 5 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 12 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 22 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Message.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Receiver.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/ReceiverImpl.h | 9 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Session.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/SessionImpl.h | 6 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 24 | ||||
-rw-r--r-- | cpp/src/tests/qpid_recv.cpp | 8 | ||||
-rw-r--r-- | cpp/src/tests/qpid_stream.cpp | 27 |
18 files changed, 138 insertions, 90 deletions
diff --git a/cpp/examples/messaging/drain.cpp b/cpp/examples/messaging/drain.cpp index 21c7df7388..bd18fd3884 100644 --- a/cpp/examples/messaging/drain.cpp +++ b/cpp/examples/messaging/drain.cpp @@ -28,14 +28,10 @@ #include <qpid/Options.h> #include <qpid/log/Logger.h> #include <qpid/log/Options.h> -#include <qpid/sys/Time.h> #include <iostream> using namespace qpid::messaging; -using qpid::sys::Duration; -using qpid::sys::TIME_INFINITE; -using qpid::sys::TIME_SEC; struct Options : public qpid::Options { @@ -67,8 +63,8 @@ struct Options : public qpid::Options Duration getTimeout() { - if (forever) return TIME_INFINITE; - else return timeout*TIME_SEC; + if (forever) return INFINITE_DURATION; + else return timeout*DURATION_SEC; } bool parse(int argc, char** argv) diff --git a/cpp/include/qpid/messaging/Duration.h b/cpp/include/qpid/messaging/Duration.h new file mode 100644 index 0000000000..5f95acf04d --- /dev/null +++ b/cpp/include/qpid/messaging/Duration.h @@ -0,0 +1,39 @@ +#ifndef QPID_MESSAGING_DURATION_H +#define QPID_MESSAGING_DURATION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/IntegerTypes.h" +#include <limits> + +namespace qpid { +namespace messaging { + +/** + * A duration is a time in milliseconds. + */ +typedef uint64_t Duration; +const Duration INFINITE_DURATION = std::numeric_limits<uint64_t>::max(); +const Duration DURATION_SEC = 1000; + +}} // namespace qpid::messaging + +#endif /*!QPID_MESSAGING_DURATION_H*/ diff --git a/cpp/include/qpid/messaging/Message.h b/cpp/include/qpid/messaging/Message.h index d1028ff330..30e15d79a3 100644 --- a/cpp/include/qpid/messaging/Message.h +++ b/cpp/include/qpid/messaging/Message.h @@ -23,6 +23,7 @@ */ #include <string> +#include "qpid/messaging/Duration.h" #include "qpid/messaging/Variant.h" #include "qpid/client/ClientImportExport.h" @@ -67,8 +68,14 @@ class Message QPID_CLIENT_EXTERN void setCorrelationId(const std::string&); QPID_CLIENT_EXTERN const std::string& getCorrelationId() const; - QPID_CLIENT_EXTERN void setTtl(uint64_t ttl); - QPID_CLIENT_EXTERN uint64_t getTtl() const; + /** + * Set the time to live for this message in milliseconds. + */ + QPID_CLIENT_EXTERN void setTtl(Duration ttl); + /** + *Get the time to live for this message in milliseconds. + */ + QPID_CLIENT_EXTERN Duration getTtl() const; QPID_CLIENT_EXTERN void setDurable(bool durable); QPID_CLIENT_EXTERN bool getDurable() const; diff --git a/cpp/include/qpid/messaging/Receiver.h b/cpp/include/qpid/messaging/Receiver.h index 0923178065..bc1f39bfc1 100644 --- a/cpp/include/qpid/messaging/Receiver.h +++ b/cpp/include/qpid/messaging/Receiver.h @@ -24,7 +24,7 @@ #include "qpid/Exception.h" #include "qpid/client/ClientImportExport.h" #include "qpid/client/Handle.h" -#include "qpid/sys/Time.h" +#include "qpid/messaging/Duration.h" namespace qpid { namespace client { @@ -57,14 +57,14 @@ class Receiver : public qpid::client::Handle<ReceiverImpl> * available. Returns false if there is no message to give after * waiting for the specified timeout. */ - QPID_CLIENT_EXTERN bool get(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN bool get(Message& message, Duration timeout=INFINITE_DURATION); /** * Retrieves a message from this receivers local queue, or waits * for upto the specified timeout for a message to become * available. Throws NoMessageAvailable if there is no * message to give after waiting for the specified timeout. */ - QPID_CLIENT_EXTERN Message get(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN Message get(Duration timeout=INFINITE_DURATION); /** * Retrieves a message for this receivers subscription or waits * for upto the specified timeout for one to become @@ -72,7 +72,7 @@ class Receiver : public qpid::client::Handle<ReceiverImpl> * that there is no message for the subscription this receiver is * serving before returning false. */ - QPID_CLIENT_EXTERN bool fetch(Message& message, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN bool fetch(Message& message, Duration timeout=INFINITE_DURATION); /** * Retrieves a message for this receivers subscription or waits * for up to the specified timeout for one to become @@ -80,7 +80,7 @@ class Receiver : public qpid::client::Handle<ReceiverImpl> * that there is no message for the subscription this receiver is * serving before throwing an exception. */ - QPID_CLIENT_EXTERN Message fetch(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN Message fetch(Duration timeout=INFINITE_DURATION); /** * Sets the capacity for the receiver. The capacity determines how * many incoming messages can be held in the receiver before being diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h index 46372cb849..87f69f268a 100644 --- a/cpp/include/qpid/messaging/Session.h +++ b/cpp/include/qpid/messaging/Session.h @@ -22,6 +22,7 @@ * */ #include "qpid/Exception.h" +#include "qpid/messaging/Duration.h" #include "qpid/client/ClientImportExport.h" #include "qpid/client/Handle.h" #include "qpid/sys/Time.h" @@ -100,7 +101,7 @@ class Session : public qpid::client::Handle<SessionImpl> * which case the passed in receiver reference will be set to the * receiver for that message or fals if no message was available. */ - QPID_CLIENT_EXTERN bool nextReceiver(Receiver&, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN bool nextReceiver(Receiver&, Duration timeout=INFINITE_DURATION); /** * Returns the receiver for the next available message. If there * are no available messages at present the call will block for up @@ -108,7 +109,7 @@ class Session : public qpid::client::Handle<SessionImpl> * Receiver::NoMessageAvailable if no message became available in * time. */ - QPID_CLIENT_EXTERN Receiver nextReceiver(qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); + QPID_CLIENT_EXTERN Receiver nextReceiver(Duration timeout=INFINITE_DURATION); /** * Create a new sender through which messages can be sent to the diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index d4ed140476..ba1ffabf5f 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -812,6 +812,7 @@ nobase_include_HEADERS += \ ../include/qpid/messaging/Address.h \ ../include/qpid/messaging/Connection.h \ ../include/qpid/messaging/Codec.h \ + ../include/qpid/messaging/Duration.h \ ../include/qpid/messaging/ListContent.h \ ../include/qpid/messaging/ListView.h \ ../include/qpid/messaging/MapContent.h \ diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 1f5c3162e5..e24f2ba5b4 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -40,28 +40,28 @@ void ReceiverImpl::received(qpid::messaging::Message&) } } -qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout) +qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!get(result, timeout)) throw Receiver::NoMessageAvailable(); return result; } -qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout) +qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable(); return result; } -bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout) +bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { Get f(*this, message, timeout); while (!parent.execute(f)) {} return f.result; } -bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) +bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { Fetch f(*this, message, timeout); while (!parent.execute(f)) {} @@ -143,12 +143,12 @@ ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF), state(UNRESOLVED), capacity(0), window(0) {} -bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) +bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { return parent.get(*this, message, timeout); } -bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) +bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) { if (state == CANCELLED) return false;//TODO: or should this be an error? diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index 0dcca3ac07..38aa125ec6 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -27,7 +27,7 @@ #include "qpid/messaging/Variant.h" #include "qpid/client/AsyncSession.h" #include "qpid/client/amqp0_10/SessionImpl.h" -#include "qpid/sys/Time.h" +#include "qpid/messaging/Duration.h" #include <memory> namespace qpid { @@ -50,10 +50,10 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl const qpid::messaging::Address& address); void init(qpid::client::AsyncSession session, AddressResolution& resolver); - bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout); - qpid::messaging::Message get(qpid::sys::Duration timeout); - bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout); - qpid::messaging::Message fetch(qpid::sys::Duration timeout); + bool get(qpid::messaging::Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message get(qpid::messaging::Duration timeout); + bool fetch(qpid::messaging::Message& message, qpid::messaging::Duration timeout); + qpid::messaging::Message fetch(qpid::messaging::Duration timeout); void close(); void start(); void stop(); @@ -79,8 +79,8 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl void startFlow(); //implementation of public facing methods - bool fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout); - bool getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout); + bool fetchImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout); + bool getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout); void closeImpl(); void setCapacityImpl(uint32_t); @@ -96,10 +96,10 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl struct Get : Command { qpid::messaging::Message& message; - qpid::sys::Duration timeout; + qpid::messaging::Duration timeout; bool result; - Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) : + Get(ReceiverImpl& i, qpid::messaging::Message& m, qpid::messaging::Duration t) : Command(i), message(m), timeout(t), result(false) {} void operator()() { result = impl.getImpl(message, timeout); } }; @@ -107,10 +107,10 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl struct Fetch : Command { qpid::messaging::Message& message; - qpid::sys::Duration timeout; + qpid::messaging::Duration timeout; bool result; - Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::sys::Duration t) : + Fetch(ReceiverImpl& i, qpid::messaging::Message& m, qpid::messaging::Duration t) : Command(i), message(m), timeout(t), result(false) {} void operator()() { result = impl.fetchImpl(message, timeout); } }; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 62af6394b0..9823dba6e1 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -266,24 +266,33 @@ bool SessionImpl::accept(ReceiverImpl* receiver, } } -bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout) +qpid::sys::Duration adjust(qpid::messaging::Duration timeout) { - return incoming.get(handler, timeout); + if (timeout < (uint64_t) (qpid::sys::TIME_INFINITE/qpid::sys::TIME_MSEC)) { + return timeout * qpid::sys::TIME_MSEC; + } else { + return qpid::sys::TIME_INFINITE; + } +} + +bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout) +{ + return incoming.get(handler, adjust(timeout)); } -bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout) +bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout) { IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1)); return getIncoming(handler, timeout); } -bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout) +bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { qpid::sys::Mutex::ScopedLock l(lock); while (true) { try { std::string destination; - if (incoming.getNextDestination(destination, timeout)) { + if (incoming.getNextDestination(destination, adjust(timeout))) { Receivers::const_iterator i = receivers.find(destination); if (i == receivers.end()) { throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination)); @@ -300,7 +309,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::D } } -qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::sys::Duration timeout) +qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout) { qpid::messaging::Receiver receiver; if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable(); diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 96c7ca93a3..285c8f031b 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -22,6 +22,7 @@ * */ #include "qpid/messaging/SessionImpl.h" +#include "qpid/messaging/Duration.h" #include "qpid/messaging/Variant.h" #include "qpid/client/Session.h" #include "qpid/client/SubscriptionManager.h" @@ -68,12 +69,12 @@ class SessionImpl : public qpid::messaging::SessionImpl qpid::messaging::Sender getSender(const std::string& name) const; qpid::messaging::Receiver getReceiver(const std::string& name) const; - bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout); - qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout); + bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout); + qpid::messaging::Receiver nextReceiver(qpid::messaging::Duration timeout); qpid::messaging::Connection getConnection() const; - bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout); + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); void receiverCancelled(const std::string& name); void senderCancelled(const std::string& name); @@ -114,7 +115,7 @@ class SessionImpl : public qpid::messaging::SessionImpl const bool transactional; bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); - bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout); + bool getIncoming(IncomingMessages::Handler& handler, qpid::messaging::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); diff --git a/cpp/src/qpid/messaging/Message.cpp b/cpp/src/qpid/messaging/Message.cpp index e4b28c3b2c..4f8a358764 100644 --- a/cpp/src/qpid/messaging/Message.cpp +++ b/cpp/src/qpid/messaging/Message.cpp @@ -50,8 +50,8 @@ const std::string& Message::getUserId() const { return impl->userId; } void Message::setCorrelationId(const std::string& id) { impl->correlationId = id; } const std::string& Message::getCorrelationId() const { return impl->correlationId; } -void Message::setTtl(uint64_t ttl) { impl->ttl = ttl; } -uint64_t Message::getTtl() const { return impl->ttl; } +void Message::setTtl(Duration ttl) { impl->ttl = ttl; } +Duration Message::getTtl() const { return impl->ttl; } void Message::setDurable(bool durable) { impl->durable = durable; } bool Message::getDurable() const { return impl->durable; } diff --git a/cpp/src/qpid/messaging/Receiver.cpp b/cpp/src/qpid/messaging/Receiver.cpp index 478228e0fb..841bee274c 100644 --- a/cpp/src/qpid/messaging/Receiver.cpp +++ b/cpp/src/qpid/messaging/Receiver.cpp @@ -39,10 +39,10 @@ Receiver::Receiver(ReceiverImpl* impl) { PI::ctor(*this, impl); } Receiver::Receiver(const Receiver& s) : qpid::client::Handle<ReceiverImpl>() { PI::copy(*this, s); } Receiver::~Receiver() { PI::dtor(*this); } Receiver& Receiver::operator=(const Receiver& s) { return PI::assign(*this, s); } -bool Receiver::get(Message& message, qpid::sys::Duration timeout) { return impl->get(message, timeout); } -Message Receiver::get(qpid::sys::Duration timeout) { return impl->get(timeout); } -bool Receiver::fetch(Message& message, qpid::sys::Duration timeout) { return impl->fetch(message, timeout); } -Message Receiver::fetch(qpid::sys::Duration timeout) { return impl->fetch(timeout); } +bool Receiver::get(Message& message, Duration timeout) { return impl->get(message, timeout); } +Message Receiver::get(Duration timeout) { return impl->get(timeout); } +bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(message, timeout); } +Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); } void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); } uint32_t Receiver::getCapacity() { return impl->getCapacity(); } uint32_t Receiver::available() { return impl->available(); } diff --git a/cpp/src/qpid/messaging/ReceiverImpl.h b/cpp/src/qpid/messaging/ReceiverImpl.h index 2076de5d56..a720fe8210 100644 --- a/cpp/src/qpid/messaging/ReceiverImpl.h +++ b/cpp/src/qpid/messaging/ReceiverImpl.h @@ -22,7 +22,6 @@ * */ #include "qpid/RefCounted.h" -#include "qpid/sys/Time.h" namespace qpid { namespace client { @@ -38,10 +37,10 @@ class ReceiverImpl : public virtual qpid::RefCounted { public: virtual ~ReceiverImpl() {} - virtual bool get(Message& message, qpid::sys::Duration timeout) = 0; - virtual Message get(qpid::sys::Duration timeout) = 0; - virtual bool fetch(Message& message, qpid::sys::Duration timeout) = 0; - virtual Message fetch(qpid::sys::Duration timeout) = 0; + virtual bool get(Message& message, Duration timeout) = 0; + virtual Message get(Duration timeout) = 0; + virtual bool fetch(Message& message, Duration timeout) = 0; + virtual Message fetch(Duration timeout) = 0; virtual void setCapacity(uint32_t) = 0; virtual uint32_t getCapacity() = 0; virtual uint32_t available() = 0; diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp index 99896caad4..5d1a6fb815 100644 --- a/cpp/src/qpid/messaging/Session.cpp +++ b/cpp/src/qpid/messaging/Session.cpp @@ -76,13 +76,13 @@ void Session::flush() impl->flush(); } -bool Session::nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) +bool Session::nextReceiver(Receiver& receiver, Duration timeout) { return impl->nextReceiver(receiver, timeout); } -Receiver Session::nextReceiver(qpid::sys::Duration timeout) +Receiver Session::nextReceiver(Duration timeout) { return impl->nextReceiver(timeout); } diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h index 164a6f6bc9..653df8fdda 100644 --- a/cpp/src/qpid/messaging/SessionImpl.h +++ b/cpp/src/qpid/messaging/SessionImpl.h @@ -23,7 +23,7 @@ */ #include "qpid/RefCounted.h" #include <string> -#include "qpid/sys/Time.h" +#include "qpid/messaging/Duration.h" namespace qpid { namespace client { @@ -50,8 +50,8 @@ class SessionImpl : public virtual qpid::RefCounted virtual void flush() = 0; virtual Sender createSender(const Address& address) = 0; virtual Receiver createReceiver(const Address& address) = 0; - virtual bool nextReceiver(Receiver& receiver, qpid::sys::Duration timeout) = 0; - virtual Receiver nextReceiver(qpid::sys::Duration timeout) = 0; + virtual bool nextReceiver(Receiver& receiver, Duration timeout) = 0; + virtual Receiver nextReceiver(Duration timeout) = 0; virtual uint32_t available() = 0; virtual uint32_t pendingAck() = 0; virtual Sender getSender(const std::string& name) const = 0; diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 9e6cd59feb..dea98216b4 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -130,7 +130,7 @@ struct MessagingFixture : public BrokerFixture Message out(Uuid(true).str()); s.send(out); Message in; - BOOST_CHECK(r.fetch(in, 5*qpid::sys::TIME_SEC)); + BOOST_CHECK(r.fetch(in, 5*DURATION_SEC)); BOOST_CHECK_EQUAL(out.getContent(), in.getContent()); r.close(); s.close(); @@ -196,7 +196,7 @@ struct MultiQueueFixture : MessagingFixture } }; -std::vector<std::string> fetch(Receiver& receiver, int count, qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5) +std::vector<std::string> fetch(Receiver& receiver, int count, Duration timeout=DURATION_SEC*5) { std::vector<std::string> data; Message message; @@ -215,7 +215,7 @@ void send(Sender& sender, uint count = 1, uint start = 1, const std::string& bas } void receive(Receiver& receiver, uint count = 1, uint start = 1, - const std::string& base = "Message", qpid::sys::Duration timeout=qpid::sys::TIME_SEC*5) + const std::string& base = "Message", Duration timeout=DURATION_SEC*5) { for (uint i = start; i < start + count; ++i) { BOOST_CHECK_EQUAL(receiver.fetch(timeout).getContent(), (boost::format("%1%_%2%") % base % i).str()); @@ -229,7 +229,7 @@ QPID_AUTO_TEST_CASE(testSimpleSendReceive) Message out("test-message"); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + Message in = receiver.fetch(5 * DURATION_SEC); fix.session.acknowledge(); BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); } @@ -246,7 +246,7 @@ QPID_AUTO_TEST_CASE(testSendReceiveHeaders) Receiver receiver = fix.session.createReceiver(fix.queue); Message in; for (uint i = 0; i < 10; ++i) { - BOOST_CHECK(receiver.fetch(in, 5 * qpid::sys::TIME_SEC)); + BOOST_CHECK(receiver.fetch(in, 5 * DURATION_SEC)); BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); BOOST_CHECK_EQUAL(in.getHeaders()["a"].asUint32(), i); fix.session.acknowledge(); @@ -323,7 +323,7 @@ QPID_AUTO_TEST_CASE(testNextReceiver) for (uint i = 0; i < fix.queues.size(); i++) { Message msg; - BOOST_CHECK(fix.session.nextReceiver().fetch(msg, qpid::sys::TIME_SEC)); + BOOST_CHECK(fix.session.nextReceiver().fetch(msg, DURATION_SEC)); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1)).str()); } } @@ -339,7 +339,7 @@ QPID_AUTO_TEST_CASE(testMapMessage) content.encode(); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + Message in = receiver.fetch(5 * DURATION_SEC); MapView view(in); BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); @@ -358,7 +358,7 @@ QPID_AUTO_TEST_CASE(testMapMessageWithInitial) content.encode(); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + Message in = receiver.fetch(5 * DURATION_SEC); MapView view(in); BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); @@ -378,7 +378,7 @@ QPID_AUTO_TEST_CASE(testListMessage) content.encode(); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + Message in = receiver.fetch(5 * DURATION_SEC); ListView view(in); BOOST_CHECK_EQUAL(view.size(), content.size()); BOOST_CHECK_EQUAL(view.front().asString(), "abc"); @@ -412,7 +412,7 @@ QPID_AUTO_TEST_CASE(testListMessageWithInitial) content.encode(); sender.send(out); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + Message in = receiver.fetch(5 * DURATION_SEC); ListView view(in); BOOST_CHECK_EQUAL(view.size(), content.size()); BOOST_CHECK_EQUAL(view.front().asString(), "abc"); @@ -441,10 +441,10 @@ QPID_AUTO_TEST_CASE(testReject) Message m2("accept-me"); sender.send(m2); Receiver receiver = fix.session.createReceiver(fix.queue); - Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + Message in = receiver.fetch(5 * DURATION_SEC); BOOST_CHECK_EQUAL(in.getContent(), m1.getContent()); fix.session.reject(in); - in = receiver.fetch(5 * qpid::sys::TIME_SEC); + in = receiver.fetch(5 * DURATION_SEC); BOOST_CHECK_EQUAL(in.getContent(), m2.getContent()); fix.session.acknowledge(); } diff --git a/cpp/src/tests/qpid_recv.cpp b/cpp/src/tests/qpid_recv.cpp index 87a360ec0c..9e4e202053 100644 --- a/cpp/src/tests/qpid_recv.cpp +++ b/cpp/src/tests/qpid_recv.cpp @@ -27,16 +27,12 @@ #include <qpid/Options.h> #include <qpid/log/Logger.h> #include <qpid/log/Options.h> -#include <qpid/sys/Time.h> #include "TestOptions.h" #include <iostream> using namespace qpid::messaging; -using qpid::sys::Duration; -using qpid::sys::TIME_INFINITE; -using qpid::sys::TIME_SEC; using namespace std; @@ -94,8 +90,8 @@ struct Options : public qpid::Options Duration getTimeout() { - if (forever) return TIME_INFINITE; - else return timeout*TIME_SEC; + if (forever) return INFINITE_DURATION; + else return timeout*DURATION_SEC; } bool parse(int argc, char** argv) diff --git a/cpp/src/tests/qpid_stream.cpp b/cpp/src/tests/qpid_stream.cpp index 3cc8e70809..ca21fa248b 100644 --- a/cpp/src/tests/qpid_stream.cpp +++ b/cpp/src/tests/qpid_stream.cpp @@ -32,7 +32,6 @@ #include <string> using namespace qpid::messaging; -using namespace qpid::sys; namespace qpid { namespace tests { @@ -58,13 +57,13 @@ Args opts; const std::string TIMESTAMP = "ts"; -uint64_t timestamp(const AbsTime& time) +uint64_t timestamp(const qpid::sys::AbsTime& time) { - Duration t(time); + qpid::sys::Duration t(time); return t; } -struct Client : Runnable +struct Client : qpid::sys::Runnable { virtual ~Client() {} virtual void doWork(Session&) = 0; @@ -83,9 +82,9 @@ struct Client : Runnable } } - Thread thread; + qpid::sys::Thread thread; - void start() { thread = Thread(this); } + void start() { thread = qpid::sys::Thread(this); } void join() { thread.join(); } }; @@ -95,20 +94,20 @@ struct Publish : Client { Sender sender = session.createSender(opts.address); Message msg; - uint64_t interval = TIME_SEC / opts.rate; + uint64_t interval = qpid::sys::TIME_SEC / opts.rate; uint64_t sent = 0, missedRate = 0; - AbsTime start = now(); + qpid::sys::AbsTime start = qpid::sys::now(); while (true) { - AbsTime sentAt = now(); + qpid::sys::AbsTime sentAt = qpid::sys::now(); msg.getHeaders()[TIMESTAMP] = timestamp(sentAt); sender.send(msg); ++sent; - AbsTime waitTill(start, sent*interval); - Duration delay(sentAt, waitTill); + qpid::sys::AbsTime waitTill(start, sent*interval); + qpid::sys::Duration delay(sentAt, waitTill); if (delay < 0) { ++missedRate; } else { - qpid::sys::usleep(delay / TIME_USEC); + qpid::sys::usleep(delay / qpid::sys::TIME_USEC); } } } @@ -128,9 +127,9 @@ struct Consume : Client session.acknowledge();//TODO: add batching option ++received; //calculate latency - uint64_t receivedAt = timestamp(now()); + uint64_t receivedAt = timestamp(qpid::sys::now()); uint64_t sentAt = msg.getHeaders()[TIMESTAMP].asUint64(); - double latency = ((double) (receivedAt - sentAt)) / TIME_MSEC; + double latency = ((double) (receivedAt - sentAt)) / qpid::sys::TIME_MSEC; //update avg, min & max minLatency = std::min(minLatency, latency); |