summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-04-09 16:27:35 +0000
committerGordon Sim <gsim@apache.org>2010-04-09 16:27:35 +0000
commitfcfff56e615c4054d52dc510c9cd1d1103249dce (patch)
treef07b0f91f1d71e1d8a270238269b70be022f755e /cpp
parenta8671b721c82bfa4eb0d3854f8af5ab903637604 (diff)
downloadqpid-python-fcfff56e615c4054d52dc510c9cd1d1103249dce.tar.gz
QPID-664: changed pending to unsettled; added available to sender; minor update to address doc
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932490 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/qpid/messaging/Address.h9
-rw-r--r--cpp/include/qpid/messaging/Receiver.h2
-rw-r--r--cpp/include/qpid/messaging/Sender.h8
-rw-r--r--cpp/include/qpid/messaging/Session.h8
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp8
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.h4
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp2
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.h2
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp32
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h16
-rw-r--r--cpp/src/qpid/messaging/Receiver.cpp4
-rw-r--r--cpp/src/qpid/messaging/ReceiverImpl.h4
-rw-r--r--cpp/src/qpid/messaging/Sender.cpp3
-rw-r--r--cpp/src/qpid/messaging/SenderImpl.h2
-rw-r--r--cpp/src/qpid/messaging/Session.cpp4
-rw-r--r--cpp/src/qpid/messaging/SessionImpl.h4
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp18
17 files changed, 72 insertions, 58 deletions
diff --git a/cpp/include/qpid/messaging/Address.h b/cpp/include/qpid/messaging/Address.h
index 3722db94e8..99e41bb253 100644
--- a/cpp/include/qpid/messaging/Address.h
+++ b/cpp/include/qpid/messaging/Address.h
@@ -84,9 +84,16 @@ class AddressImpl;
*
* <tr valign=top>
* <td>node</td>
+
* <td>A nested map describing properties of the addressed
* node. Current properties supported are type (topic or queue),
- * durable (boolean), x-declare and x-bindings.
+ * durable (boolean), x-declare and x-bindings. The x-declare
+ * option is a nested map in whcih protocol amqp 0-10 specific
+ * options for queue or exchange declare can be specified. The
+ * x-bindings option is a nested list, each element of which can
+ * specify a queue, an exchange, a binding-key and arguments,
+ * which are used to establish a binding on create. The node
+ * will be used if queue or exchange values are not specified.
* </td>
* </tr>
*
diff --git a/cpp/include/qpid/messaging/Receiver.h b/cpp/include/qpid/messaging/Receiver.h
index 6926d3401a..738ff82507 100644
--- a/cpp/include/qpid/messaging/Receiver.h
+++ b/cpp/include/qpid/messaging/Receiver.h
@@ -104,7 +104,7 @@ class Receiver : public qpid::messaging::Handle<ReceiverImpl>
* acknowledgement has not yet been confirmed as processed by the
* server.
*/
- QPID_CLIENT_EXTERN uint32_t getPendingAck();
+ QPID_CLIENT_EXTERN uint32_t getUnsettled();
/**
* Cancels this receiver.
diff --git a/cpp/include/qpid/messaging/Sender.h b/cpp/include/qpid/messaging/Sender.h
index 7c4b68731e..80fa174d80 100644
--- a/cpp/include/qpid/messaging/Sender.h
+++ b/cpp/include/qpid/messaging/Sender.h
@@ -70,8 +70,12 @@ class Sender : public qpid::messaging::Handle<SenderImpl>
* Returns the number of sent messages pending confirmation of
* receipt by the broker. (These are the 'in-doubt' messages).
*/
- QPID_CLIENT_EXTERN uint32_t getPending();
-
+ QPID_CLIENT_EXTERN uint32_t getUnsettled();
+ /**
+ * Returns the number of messages for which there is available
+ * capacity.
+ */
+ QPID_CLIENT_EXTERN uint32_t getAvailable();
/**
* Returns the name of this sender.
*/
diff --git a/cpp/include/qpid/messaging/Session.h b/cpp/include/qpid/messaging/Session.h
index b3bc527329..ac0ea425f6 100644
--- a/cpp/include/qpid/messaging/Session.h
+++ b/cpp/include/qpid/messaging/Session.h
@@ -96,15 +96,17 @@ class Session : public qpid::messaging::Handle<SessionImpl>
/**
* Returns the total number of messages received and waiting to be
- * fetched by all Receivers belonging to this session.
+ * fetched by all Receivers belonging to this session. This is the
+ * total number of available messages across all receivers on this
+ * session.
*/
- QPID_CLIENT_EXTERN uint32_t getAvailable();
+ QPID_CLIENT_EXTERN uint32_t getReceivable();
/**
* Returns a count of the number of messages received this session
* that have been acknowledged, but for which that acknowledgement
* has not yet been confirmed as processed by the server.
*/
- QPID_CLIENT_EXTERN uint32_t getPendingAck();
+ QPID_CLIENT_EXTERN uint32_t getUnsettledAcks();
/**
* Retrieves the receiver for the next available message. If there
* are no available messages at present the call will block for up
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index 343b5cad37..b86f142546 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -130,14 +130,14 @@ uint32_t ReceiverImpl::getCapacity()
return capacity;
}
-uint32_t ReceiverImpl::available()
+uint32_t ReceiverImpl::getAvailable()
{
- return parent->available(destination);
+ return parent->getReceivable(destination);
}
-uint32_t ReceiverImpl::pendingAck()
+uint32_t ReceiverImpl::getUnsettled()
{
- return parent->pendingAck(destination);
+ return parent->getUnsettledAcks(destination);
}
ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
index d509490688..e6d11e4bb5 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h
@@ -60,8 +60,8 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl
const std::string& getName() const;
void setCapacity(uint32_t);
uint32_t getCapacity();
- uint32_t available();
- uint32_t pendingAck();
+ uint32_t getAvailable();
+ uint32_t getUnsettled();
void received(qpid::messaging::Message& message);
qpid::messaging::Session getSession() const;
private:
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index 522c93a552..4c41622751 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -58,7 +58,7 @@ void SenderImpl::setCapacity(uint32_t c)
execute1<CheckPendingSends>(flush);
}
uint32_t SenderImpl::getCapacity() { return capacity; }
-uint32_t SenderImpl::pending()
+uint32_t SenderImpl::getUnsettled()
{
CheckPendingSends f(*this, false);
parent->execute(f);
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.h b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
index 7ea68fd187..826c734697 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.h
@@ -52,7 +52,7 @@ class SenderImpl : public qpid::messaging::SenderImpl
void close();
void setCapacity(uint32_t);
uint32_t getCapacity();
- uint32_t pending();
+ uint32_t getUnsettled();
void init(qpid::client::AsyncSession, AddressResolution&);
const std::string& getName() const;
qpid::messaging::Session getSession() const;
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 33a3e226ff..15a936465b 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -341,25 +341,25 @@ qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration ti
return receiver;
}
-uint32_t SessionImpl::available()
+uint32_t SessionImpl::getReceivable()
{
- return get1<Available, uint32_t>((const std::string*) 0);
+ return get1<Receivable, uint32_t>((const std::string*) 0);
}
-uint32_t SessionImpl::available(const std::string& destination)
+uint32_t SessionImpl::getReceivable(const std::string& destination)
{
- return get1<Available, uint32_t>(&destination);
+ return get1<Receivable, uint32_t>(&destination);
}
-struct SessionImpl::Available : Command
+struct SessionImpl::Receivable : Command
{
const std::string* destination;
uint32_t result;
- Available(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
- void operator()() { result = impl.availableImpl(destination); }
+ Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.getReceivableImpl(destination); }
};
-uint32_t SessionImpl::availableImpl(const std::string* destination)
+uint32_t SessionImpl::getReceivableImpl(const std::string* destination)
{
if (destination) {
return incoming.available(*destination);
@@ -368,26 +368,26 @@ uint32_t SessionImpl::availableImpl(const std::string* destination)
}
}
-uint32_t SessionImpl::pendingAck()
+uint32_t SessionImpl::getUnsettledAcks()
{
- return get1<PendingAck, uint32_t>((const std::string*) 0);
+ return get1<UnsettledAcks, uint32_t>((const std::string*) 0);
}
-uint32_t SessionImpl::pendingAck(const std::string& destination)
+uint32_t SessionImpl::getUnsettledAcks(const std::string& destination)
{
- return get1<PendingAck, uint32_t>(&destination);
+ return get1<UnsettledAcks, uint32_t>(&destination);
}
-struct SessionImpl::PendingAck : Command
+struct SessionImpl::UnsettledAcks : Command
{
const std::string* destination;
uint32_t result;
- PendingAck(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
- void operator()() { result = impl.pendingAckImpl(destination); }
+ UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {}
+ void operator()() { result = impl.getUnsettledAcksImpl(destination); }
};
-uint32_t SessionImpl::pendingAckImpl(const std::string* destination)
+uint32_t SessionImpl::getUnsettledAcksImpl(const std::string* destination)
{
if (destination) {
return incoming.pendingAccept(*destination);
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index e1229055f7..0613074f7c 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -82,11 +82,11 @@ class SessionImpl : public qpid::messaging::SessionImpl
void receiverCancelled(const std::string& name);
void senderCancelled(const std::string& name);
- uint32_t available();
- uint32_t available(const std::string& destination);
+ uint32_t getReceivable();
+ uint32_t getReceivable(const std::string& destination);
- uint32_t pendingAck();
- uint32_t pendingAck(const std::string& destination);
+ uint32_t getUnsettledAcks();
+ uint32_t getUnsettledAcks(const std::string& destination);
void setSession(qpid::client::Session);
@@ -143,8 +143,8 @@ class SessionImpl : public qpid::messaging::SessionImpl
void syncImpl(bool block);
qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address);
qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address);
- uint32_t availableImpl(const std::string* destination);
- uint32_t pendingAckImpl(const std::string* destination);
+ uint32_t getReceivableImpl(const std::string* destination);
+ uint32_t getUnsettledAcksImpl(const std::string* destination);
//functors for public facing methods (allows locking and retry
//logic to be centralised)
@@ -203,8 +203,8 @@ class SessionImpl : public qpid::messaging::SessionImpl
struct CreateSender;
struct CreateReceiver;
- struct PendingAck;
- struct Available;
+ struct UnsettledAcks;
+ struct Receivable;
//helper templates for some common patterns
template <class F> bool execute()
diff --git a/cpp/src/qpid/messaging/Receiver.cpp b/cpp/src/qpid/messaging/Receiver.cpp
index ff67650cf8..552c1db16c 100644
--- a/cpp/src/qpid/messaging/Receiver.cpp
+++ b/cpp/src/qpid/messaging/Receiver.cpp
@@ -39,8 +39,8 @@ bool Receiver::fetch(Message& message, Duration timeout) { return impl->fetch(me
Message Receiver::fetch(Duration timeout) { return impl->fetch(timeout); }
void Receiver::setCapacity(uint32_t c) { impl->setCapacity(c); }
uint32_t Receiver::getCapacity() { return impl->getCapacity(); }
-uint32_t Receiver::getAvailable() { return impl->available(); }
-uint32_t Receiver::getPendingAck() { return impl->pendingAck(); }
+uint32_t Receiver::getAvailable() { return impl->getAvailable(); }
+uint32_t Receiver::getUnsettled() { return impl->getUnsettled(); }
void Receiver::close() { impl->close(); }
const std::string& Receiver::getName() const { return impl->getName(); }
Session Receiver::getSession() const { return impl->getSession(); }
diff --git a/cpp/src/qpid/messaging/ReceiverImpl.h b/cpp/src/qpid/messaging/ReceiverImpl.h
index c156265f6c..07da0636f7 100644
--- a/cpp/src/qpid/messaging/ReceiverImpl.h
+++ b/cpp/src/qpid/messaging/ReceiverImpl.h
@@ -40,8 +40,8 @@ class ReceiverImpl : public virtual qpid::RefCounted
virtual Message fetch(Duration timeout) = 0;
virtual void setCapacity(uint32_t) = 0;
virtual uint32_t getCapacity() = 0;
- virtual uint32_t available() = 0;
- virtual uint32_t pendingAck() = 0;
+ virtual uint32_t getAvailable() = 0;
+ virtual uint32_t getUnsettled() = 0;
virtual void close() = 0;
virtual const std::string& getName() const = 0;
virtual Session getSession() const = 0;
diff --git a/cpp/src/qpid/messaging/Sender.cpp b/cpp/src/qpid/messaging/Sender.cpp
index b4c247d1d9..53dbb69777 100644
--- a/cpp/src/qpid/messaging/Sender.cpp
+++ b/cpp/src/qpid/messaging/Sender.cpp
@@ -36,7 +36,8 @@ void Sender::send(const Message& message, bool sync) { impl->send(message, sync)
void Sender::close() { impl->close(); }
void Sender::setCapacity(uint32_t c) { impl->setCapacity(c); }
uint32_t Sender::getCapacity() { return impl->getCapacity(); }
-uint32_t Sender::getPending() { return impl->pending(); }
+uint32_t Sender::getUnsettled() { return impl->getUnsettled(); }
+uint32_t Sender::getAvailable() { return getCapacity() - getUnsettled(); }
const std::string& Sender::getName() const { return impl->getName(); }
Session Sender::getSession() const { return impl->getSession(); }
diff --git a/cpp/src/qpid/messaging/SenderImpl.h b/cpp/src/qpid/messaging/SenderImpl.h
index 66651a16dc..a1ca02c72c 100644
--- a/cpp/src/qpid/messaging/SenderImpl.h
+++ b/cpp/src/qpid/messaging/SenderImpl.h
@@ -37,7 +37,7 @@ class SenderImpl : public virtual qpid::RefCounted
virtual void close() = 0;
virtual void setCapacity(uint32_t) = 0;
virtual uint32_t getCapacity() = 0;
- virtual uint32_t pending() = 0;
+ virtual uint32_t getUnsettled() = 0;
virtual const std::string& getName() const = 0;
virtual Session getSession() const = 0;
private:
diff --git a/cpp/src/qpid/messaging/Session.cpp b/cpp/src/qpid/messaging/Session.cpp
index bd9048e893..ddbb9ee58e 100644
--- a/cpp/src/qpid/messaging/Session.cpp
+++ b/cpp/src/qpid/messaging/Session.cpp
@@ -77,8 +77,8 @@ Receiver Session::nextReceiver(Duration timeout)
return impl->nextReceiver(timeout);
}
-uint32_t Session::getAvailable() { return impl->available(); }
-uint32_t Session::getPendingAck() { return impl->pendingAck(); }
+uint32_t Session::getReceivable() { return impl->getReceivable(); }
+uint32_t Session::getUnsettledAcks() { return impl->getUnsettledAcks(); }
Sender Session::getSender(const std::string& name) const
{
diff --git a/cpp/src/qpid/messaging/SessionImpl.h b/cpp/src/qpid/messaging/SessionImpl.h
index e9a200b22e..de050fc5bb 100644
--- a/cpp/src/qpid/messaging/SessionImpl.h
+++ b/cpp/src/qpid/messaging/SessionImpl.h
@@ -49,8 +49,8 @@ class SessionImpl : public virtual qpid::RefCounted
virtual Receiver createReceiver(const Address& address) = 0;
virtual bool nextReceiver(Receiver& receiver, Duration timeout) = 0;
virtual Receiver nextReceiver(Duration timeout) = 0;
- virtual uint32_t available() = 0;
- virtual uint32_t pendingAck() = 0;
+ virtual uint32_t getReceivable() = 0;
+ virtual uint32_t getUnsettledAcks() = 0;
virtual Sender getSender(const std::string& name) const = 0;
virtual Receiver getReceiver(const std::string& name) const = 0;
virtual Connection getConnection() const = 0;
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index aa0c65d319..d134793b82 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -491,7 +491,7 @@ QPID_AUTO_TEST_CASE(testAvailable)
}
qpid::sys::sleep(1);//is there any avoid an arbitrary sleep while waiting for messages to be dispatched?
for (uint i = 0; i < 5; ++i) {
- BOOST_CHECK_EQUAL(fix.session.getAvailable(), 15u - 2*i);
+ BOOST_CHECK_EQUAL(fix.session.getReceivable(), 15u - 2*i);
BOOST_CHECK_EQUAL(r1.getAvailable(), 10u - i);
BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str());
BOOST_CHECK_EQUAL(r2.getAvailable(), 5u - i);
@@ -499,13 +499,13 @@ QPID_AUTO_TEST_CASE(testAvailable)
fix.session.acknowledge();
}
for (uint i = 5; i < 10; ++i) {
- BOOST_CHECK_EQUAL(fix.session.getAvailable(), 10u - i);
+ BOOST_CHECK_EQUAL(fix.session.getReceivable(), 10u - i);
BOOST_CHECK_EQUAL(r1.getAvailable(), 10u - i);
BOOST_CHECK_EQUAL(r1.fetch().getContent(), (boost::format("A_%1%") % (i+1)).str());
}
}
-QPID_AUTO_TEST_CASE(testPendingAck)
+QPID_AUTO_TEST_CASE(testUnsettledAcks)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
@@ -516,14 +516,14 @@ QPID_AUTO_TEST_CASE(testPendingAck)
for (uint i = 0; i < 10; ++i) {
BOOST_CHECK_EQUAL(receiver.fetch().getContent(), (boost::format("Message_%1%") % (i+1)).str());
}
- BOOST_CHECK_EQUAL(fix.session.getPendingAck(), 0u);
+ BOOST_CHECK_EQUAL(fix.session.getUnsettledAcks(), 0u);
fix.session.acknowledge();
- BOOST_CHECK_EQUAL(fix.session.getPendingAck(), 10u);
+ BOOST_CHECK_EQUAL(fix.session.getUnsettledAcks(), 10u);
fix.session.sync();
- BOOST_CHECK_EQUAL(fix.session.getPendingAck(), 0u);
+ BOOST_CHECK_EQUAL(fix.session.getUnsettledAcks(), 0u);
}
-QPID_AUTO_TEST_CASE(testPendingSend)
+QPID_AUTO_TEST_CASE(testUnsettledSend)
{
QueueFixture fix;
Sender sender = fix.session.createSender(fix.queue);
@@ -532,9 +532,9 @@ QPID_AUTO_TEST_CASE(testPendingSend)
//implementation and the fact that the simple test case makes it
//possible to predict when completion information will be sent to
//the client. TODO: is there a better way of testing this?
- BOOST_CHECK_EQUAL(sender.getPending(), 10u);
+ BOOST_CHECK_EQUAL(sender.getUnsettled(), 10u);
fix.session.sync();
- BOOST_CHECK_EQUAL(sender.getPending(), 0u);
+ BOOST_CHECK_EQUAL(sender.getUnsettled(), 0u);
Receiver receiver = fix.session.createReceiver(fix.queue);
receive(receiver, 10);