diff options
author | Gordon Sim <gsim@apache.org> | 2010-04-09 16:27:35 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-04-09 16:27:35 +0000 |
commit | fcfff56e615c4054d52dc510c9cd1d1103249dce (patch) | |
tree | f07b0f91f1d71e1d8a270238269b70be022f755e | |
parent | a8671b721c82bfa4eb0d3854f8af5ab903637604 (diff) | |
download | qpid-python-fcfff56e615c4054d52dc510c9cd1d1103249dce.tar.gz |
QPID-664: changed pending to unsettled; added available to sender; minor update to address doc
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932490 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/include/qpid/messaging/Address.h | 9 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Receiver.h | 2 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Sender.h | 8 | ||||
-rw-r--r-- | cpp/include/qpid/messaging/Session.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 8 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 32 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Receiver.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/ReceiverImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Sender.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/SenderImpl.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/Session.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/SessionImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 18 |
17 files changed, 72 insertions, 58 deletions
diff --git a/cpp/include/qpid/messaging/Address.h b/cpp/include/qpid/messaging/Address.h index 3722db94e8..99e41bb253 100644 --- a/cpp/include/qpid/messaging/Address.h +++ b/cpp/include/qpid/messaging/Address.h @@ -84,9 +84,16 @@ class AddressImpl; * * <tr valign=top> * <td>node</td> + * <td>A nested map describing properties of the addressed * node. Current properties supported are type (topic or queue), - * durable (boolean), x-declare and x-bindings. + * durable (boolean), x-declare and x-bindings. The x-declare + * option is a nested map in whcih protocol amqp 0-10 specific + * options for queue or exchange declare can be specified. The + * x-bindings option is a nested list, each element of which can + * specify a queue, an exchange, a binding-key and arguments, + * which are used to establish a binding on create. The node + * will be used if queue or exchange values are not specified. * </td> * </tr> * diff --git a/cpp/include/qpid/messaging/Receiver.h b/cpp/include/qpid/messaging/Receiver.h index 6926d3401a..738ff82507 100644 --- a/cpp/include/qpid/messaging/Receiver.h +++ b/cpp/include/qpid/messaging/Receiver.h @@ -104,7 +104,7 @@ class Receiver : public qpid::messaging::Handle<ReceiverImpl> * acknowledgement has not yet been confirmed as processed by the * server. */ - QPID_CLIENT_EXTERN uint32_t getPendingAck(); + QPID_CLIENT_EXTERN uint32_t getUnsettled(); /** * Cancels this receiver. diff --git a/cpp/include/qpid/messaging/Sender.h b/cpp/include/qpid/messaging/Sender.h index 7c4b68731e..80fa174d80 100644 --- a/cpp/include/qpid/messaging/Sender.h +++ b/cpp/include/qpid/messaging/Sender.h @@ -70,8 +70,12 @@ class Sender : public qpid::messaging::Handle<SenderImpl> * Returns the number of sent messages pending confirmation of * receipt by the broker. (These are the 'in-doubt' messages). */ - QPID_CLIENT_EXTERN uint32_t getPending(); - + QPID_CLIENT_EXTERN uint32_t getUnsettled(); + /** + * Returns the number of messages for which there is available + * capacity. + */ + QPID_CLIENT_EXTERN uint32_t getAvailable(); /** * Returns the name of this sender. */ diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h index b3bc527329..ac0ea425f6 100644 --- a/cpp/include/qpid/messaging/Session.h +++ b/cpp/include/qpid/messaging/Session.h @@ -96,15 +96,17 @@ class Session : public qpid::messaging::Handle<SessionImpl> /** * Returns the total number of messages received and waiting to be - * fetched by all Receivers belonging to this session. + * fetched by all Receivers belonging to this session. This is the + * total number of available messages across all receivers on this + * session. */ - QPID_CLIENT_EXTERN uint32_t getAvailable(); + QPID_CLIENT_EXTERN uint32_t getReceivable(); /** * Returns a count of the number of messages received this session * that have been acknowledged, but for which that acknowledgement * has not yet been confirmed as processed by the server. */ - QPID_CLIENT_EXTERN uint32_t getPendingAck(); + QPID_CLIENT_EXTERN uint32_t getUnsettledAcks(); /** * Retrieves the receiver for the next available message. If there * are no available messages at present the call will block for up diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index 343b5cad37..b86f142546 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -130,14 +130,14 @@ uint32_t ReceiverImpl::getCapacity() return capacity; } -uint32_t ReceiverImpl::available() +uint32_t ReceiverImpl::getAvailable() { - return parent->available(destination); + return parent->getReceivable(destination); } -uint32_t ReceiverImpl::pendingAck() +uint32_t ReceiverImpl::getUnsettled() { - return parent->pendingAck(destination); + return parent->getUnsettledAcks(destination); } ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index d509490688..e6d11e4bb5 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -60,8 +60,8 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl const std::string& getName() const; void setCapacity(uint32_t); uint32_t getCapacity(); - uint32_t available(); - uint32_t pendingAck(); + uint32_t getAvailable(); + uint32_t getUnsettled(); void received(qpid::messaging::Message& message); qpid::messaging::Session getSession() const; private: diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index 522c93a552..4c41622751 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -58,7 +58,7 @@ void SenderImpl::setCapacity(uint32_t c) execute1<CheckPendingSends>(flush); } uint32_t SenderImpl::getCapacity() { return capacity; } -uint32_t SenderImpl::pending() +uint32_t SenderImpl::getUnsettled() { CheckPendingSends f(*this, false); parent->execute(f); diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h index 7ea68fd187..826c734697 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h @@ -52,7 +52,7 @@ class SenderImpl : public qpid::messaging::SenderImpl void close(); void setCapacity(uint32_t); uint32_t getCapacity(); - uint32_t pending(); + uint32_t getUnsettled(); void init(qpid::client::AsyncSession, AddressResolution&); const std::string& getName() const; qpid::messaging::Session getSession() const; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 33a3e226ff..15a936465b 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -341,25 +341,25 @@ qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration ti return receiver; } -uint32_t SessionImpl::available() +uint32_t SessionImpl::getReceivable() { - return get1<Available, uint32_t>((const std::string*) 0); + return get1<Receivable, uint32_t>((const std::string*) 0); } -uint32_t SessionImpl::available(const std::string& destination) +uint32_t SessionImpl::getReceivable(const std::string& destination) { - return get1<Available, uint32_t>(&destination); + return get1<Receivable, uint32_t>(&destination); } -struct SessionImpl::Available : Command +struct SessionImpl::Receivable : Command { const std::string* destination; uint32_t result; - Available(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} - void operator()() { result = impl.availableImpl(destination); } + Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} + void operator()() { result = impl.getReceivableImpl(destination); } }; -uint32_t SessionImpl::availableImpl(const std::string* destination) +uint32_t SessionImpl::getReceivableImpl(const std::string* destination) { if (destination) { return incoming.available(*destination); @@ -368,26 +368,26 @@ uint32_t SessionImpl::availableImpl(const std::string* destination) } } -uint32_t SessionImpl::pendingAck() +uint32_t SessionImpl::getUnsettledAcks() { - return get1<PendingAck, uint32_t>((const std::string*) 0); + return get1<UnsettledAcks, uint32_t>((const std::string*) 0); } -uint32_t SessionImpl::pendingAck(const std::string& destination) +uint32_t SessionImpl::getUnsettledAcks(const std::string& destination) { - return get1<PendingAck, uint32_t>(&destination); + return get1<UnsettledAcks, uint32_t>(&destination); } -struct SessionImpl::PendingAck : Command +struct SessionImpl::UnsettledAcks : Command { const std::string* destination; uint32_t result; - PendingAck(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} - void operator()() { result = impl.pendingAckImpl(destination); } + UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} + void operator()() { result = impl.getUnsettledAcksImpl(destination); } }; -uint32_t SessionImpl::pendingAckImpl(const std::string* destination) +uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination) { if (destination) { return incoming.pendingAccept(*destination); diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index e1229055f7..0613074f7c 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -82,11 +82,11 @@ class SessionImpl : public qpid::messaging::SessionImpl void receiverCancelled(const std::string& name); void senderCancelled(const std::string& name); - uint32_t available(); - uint32_t available(const std::string& destination); + uint32_t getReceivable(); + uint32_t getReceivable(const std::string& destination); - uint32_t pendingAck(); - uint32_t pendingAck(const std::string& destination); + uint32_t getUnsettledAcks(); + uint32_t getUnsettledAcks(const std::string& destination); void setSession(qpid::client::Session); @@ -143,8 +143,8 @@ class SessionImpl : public qpid::messaging::SessionImpl void syncImpl(bool block); qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address); qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address); - uint32_t availableImpl(const std::string* destination); - uint32_t pendingAckImpl(const std::string* destination); + uint32_t getReceivableImpl(const std::string* destination); + uint32_t getUnsettledAcksImpl(const std::string* destination); //functors for public facing methods (allows locking and retry //logic to be centralised) @@ -203,8 +203,8 @@ class SessionImpl : public qpid::messaging::SessionImpl struct CreateSender; struct CreateReceiver; - struct PendingAck; - struct Available; + struct UnsettledAcks; + struct Receivable; //helper templates for some common patterns template <class F> bool execute() diff --git a/cpp/src/qpid/messaging/Receiver.cpp b/cpp/src/qpid/messaging/Receiver.cpp index ff67650cf8..552c1db16c 100644 --- a/cpp/src/qpid/messaging/Receiver.cpp +++ b/cpp/src/qpid/messaging/Receiver.cpp @@ -39,8 +39,8 @@ bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(me Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); } void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); } uint32_t Receiver::getCapacity() { return impl->getCapacity(); } -uint32_t Receiver::getAvailable() { return impl->available(); } -uint32_t Receiver::getPendingAck() { return impl->pendingAck(); } +uint32_t Receiver::getAvailable() { return impl->getAvailable(); } +uint32_t Receiver::getUnsettled() { return impl->getUnsettled(); } void Receiver::close() { impl->close(); } const std::string& Receiver::getName() const { return impl->getName(); } Session Receiver::getSession() const { return impl->getSession(); } diff --git a/cpp/src/qpid/messaging/ReceiverImpl.h b/cpp/src/qpid/messaging/ReceiverImpl.h index c156265f6c..07da0636f7 100644 --- a/cpp/src/qpid/messaging/ReceiverImpl.h +++ b/cpp/src/qpid/messaging/ReceiverImpl.h @@ -40,8 +40,8 @@ class ReceiverImpl : public virtual qpid::RefCounted virtual Message fetch(Duration timeout) = 0; virtual void setCapacity(uint32_t) = 0; virtual uint32_t getCapacity() = 0; - virtual uint32_t available() = 0; - virtual uint32_t pendingAck() = 0; + virtual uint32_t getAvailable() = 0; + virtual uint32_t getUnsettled() = 0; virtual void close() = 0; virtual const std::string& getName() const = 0; virtual Session getSession() const = 0; diff --git a/cpp/src/qpid/messaging/Sender.cpp b/cpp/src/qpid/messaging/Sender.cpp index b4c247d1d9..53dbb69777 100644 --- a/cpp/src/qpid/messaging/Sender.cpp +++ b/cpp/src/qpid/messaging/Sender.cpp @@ -36,7 +36,8 @@ void Sender::send(const Message& message, bool sync) { impl->send(message, sync) void Sender::close() { impl->close(); } void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); } uint32_t Sender::getCapacity() { return impl->getCapacity(); } -uint32_t Sender::getPending() { return impl->pending(); } +uint32_t Sender::getUnsettled() { return impl->getUnsettled(); } +uint32_t Sender::getAvailable() { return getCapacity() - getUnsettled(); } const std::string& Sender::getName() const { return impl->getName(); } Session Sender::getSession() const { return impl->getSession(); } diff --git a/cpp/src/qpid/messaging/SenderImpl.h b/cpp/src/qpid/messaging/SenderImpl.h index 66651a16dc..a1ca02c72c 100644 --- a/cpp/src/qpid/messaging/SenderImpl.h +++ b/cpp/src/qpid/messaging/SenderImpl.h @@ -37,7 +37,7 @@ class SenderImpl : public virtual qpid::RefCounted virtual void close() = 0; virtual void setCapacity(uint32_t) = 0; virtual uint32_t getCapacity() = 0; - virtual uint32_t pending() = 0; + virtual uint32_t getUnsettled() = 0; virtual const std::string& getName() const = 0; virtual Session getSession() const = 0; private: diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp index bd9048e893..ddbb9ee58e 100644 --- a/cpp/src/qpid/messaging/Session.cpp +++ b/cpp/src/qpid/messaging/Session.cpp @@ -77,8 +77,8 @@ Receiver Session::nextReceiver(Duration timeout) return impl->nextReceiver(timeout); } -uint32_t Session::getAvailable() { return impl->available(); } -uint32_t Session::getPendingAck() { return impl->pendingAck(); } +uint32_t Session::getReceivable() { return impl->getReceivable(); } +uint32_t Session::getUnsettledAcks() { return impl->getUnsettledAcks(); } Sender Session::getSender(const std::string& name) const { diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h index e9a200b22e..de050fc5bb 100644 --- a/cpp/src/qpid/messaging/SessionImpl.h +++ b/cpp/src/qpid/messaging/SessionImpl.h @@ -49,8 +49,8 @@ class SessionImpl : public virtual qpid::RefCounted virtual Receiver createReceiver(const Address& address) = 0; virtual bool nextReceiver(Receiver& receiver, Duration timeout) = 0; virtual Receiver nextReceiver(Duration timeout) = 0; - virtual uint32_t available() = 0; - virtual uint32_t pendingAck() = 0; + virtual uint32_t getReceivable() = 0; + virtual uint32_t getUnsettledAcks() = 0; virtual Sender getSender(const std::string& name) const = 0; virtual Receiver getReceiver(const std::string& name) const = 0; virtual Connection getConnection() const = 0; diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index aa0c65d319..d134793b82 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -491,7 +491,7 @@ QPID_AUTO_TEST_CASE(testAvailable) } qpid::sys::sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched? for (uint i = 0; i < 5; ++i) { - BOOST_CHECK_EQUAL(fix.session.getAvailable(), 15u - 2*i); + BOOST_CHECK_EQUAL(fix.session.getReceivable(), 15u - 2*i); BOOST_CHECK_EQUAL(r1.getAvailable(), 10u - i); BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str()); BOOST_CHECK_EQUAL(r2.getAvailable(), 5u - i); @@ -499,13 +499,13 @@ QPID_AUTO_TEST_CASE(testAvailable) fix.session.acknowledge(); } for (uint i = 5; i < 10; ++i) { - BOOST_CHECK_EQUAL(fix.session.getAvailable(), 10u - i); + BOOST_CHECK_EQUAL(fix.session.getReceivable(), 10u - i); BOOST_CHECK_EQUAL(r1.getAvailable(), 10u - i); BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str()); } } -QPID_AUTO_TEST_CASE(testPendingAck) +QPID_AUTO_TEST_CASE(testUnsettledAcks) { QueueFixture fix; Sender sender = fix.session.createSender(fix.queue); @@ -516,14 +516,14 @@ QPID_AUTO_TEST_CASE(testPendingAck) for (uint i = 0; i < 10; ++i) { BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str()); } - BOOST_CHECK_EQUAL(fix.session.getPendingAck(), 0u); + BOOST_CHECK_EQUAL(fix.session.getUnsettledAcks(), 0u); fix.session.acknowledge(); - BOOST_CHECK_EQUAL(fix.session.getPendingAck(), 10u); + BOOST_CHECK_EQUAL(fix.session.getUnsettledAcks(), 10u); fix.session.sync(); - BOOST_CHECK_EQUAL(fix.session.getPendingAck(), 0u); + BOOST_CHECK_EQUAL(fix.session.getUnsettledAcks(), 0u); } -QPID_AUTO_TEST_CASE(testPendingSend) +QPID_AUTO_TEST_CASE(testUnsettledSend) { QueueFixture fix; Sender sender = fix.session.createSender(fix.queue); @@ -532,9 +532,9 @@ QPID_AUTO_TEST_CASE(testPendingSend) //implementation and the fact that the simple test case makes it //possible to predict when completion information will be sent to //the client. TODO: is there a better way of testing this? - BOOST_CHECK_EQUAL(sender.getPending(), 10u); + BOOST_CHECK_EQUAL(sender.getUnsettled(), 10u); fix.session.sync(); - BOOST_CHECK_EQUAL(sender.getPending(), 0u); + BOOST_CHECK_EQUAL(sender.getUnsettled(), 0u); Receiver receiver = fix.session.createReceiver(fix.queue); receive(receiver, 10); |