summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-03-07 13:20:02 +0000
committerGordon Sim <gsim@apache.org>2008-03-07 13:20:02 +0000
commit5d8a9df4ec3a4f030ed80e143ce6986c19ab800a (patch)
tree8417c3abe9dd81e6a73084aa36371981e06f9e27 /cpp/src
parent9fd4909832e16734c47c13eebbe4aca66640b1b0 (diff)
downloadqpid-python-5d8a9df4ec3a4f030ed80e143ce6986c19ab800a.tar.gz
Altered management of delivery records to support separateion of completion (which drives flow control) and acceptance.
Converted flow control python tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Consumer.h2
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp64
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h23
-rw-r--r--cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp13
-rw-r--r--cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp24
-rw-r--r--cpp/src/qpid/broker/SemanticState.h4
7 files changed, 86 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index ed4bb176f6..65c60182b8 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -35,7 +35,7 @@ namespace qpid {
{
intrusive_ptr<Message> payload;
framing::SequenceNumber position;
- Queue* queue;
+ Queue* queue;
QueuedMessage(Queue* q, intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
payload(msg), position(sn), queue(q) {}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 154394e5de..ca90f32a5d 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -32,16 +32,20 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
const std::string _tag,
DeliveryToken::shared_ptr _token,
const DeliveryId _id,
- bool _acquired, bool _confirmed) : msg(_msg),
- queue(_queue),
- tag(_tag),
- token(_token),
- id(_id),
- acquired(_acquired),
- confirmed(_confirmed),
- pull(false),
- cancelled(false)
+ bool _acquired, bool accepted) : msg(_msg),
+ queue(_queue),
+ tag(_tag),
+ token(_token),
+ id(_id),
+ acquired(_acquired),
+ pull(false),
+ cancelled(false),
+ credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
+ size(msg.payload ? msg.payload->contentSize() : 0),
+ completed(false),
+ ended(accepted)
{
+ if (accepted) setEnded();
}
DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
@@ -50,14 +54,23 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
queue(_queue),
id(_id),
acquired(true),
- confirmed(false),
pull(true),
- cancelled(false)
+ cancelled(false),
+ credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
+ size(msg.payload ? msg.payload->contentSize() : 0),
+ completed(false),
+ ended(false)
{}
+void DeliveryRecord::setEnded()
+{
+ ended = true;
+ //reset msg pointer, don't need to hold on to it anymore
+ msg.payload = boost::intrusive_ptr<Message>();
+}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
- if (acquired && !confirmed) {
+ if (acquired && !ended) {
queue->dequeue(ctxt, msg.payload);
}
}
@@ -79,7 +92,7 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const
}
void DeliveryRecord::redeliver(SemanticState* const session) {
- if (!confirmed) {
+ if (!ended) {
if(pull || cancelled){
//if message was originally sent as response to get, we must requeue it
@@ -96,7 +109,7 @@ void DeliveryRecord::redeliver(SemanticState* const session) {
void DeliveryRecord::requeue() const
{
- if (acquired && !confirmed) {
+ if (acquired && !ended) {
msg.payload->redeliver();
queue->requeue(msg);
}
@@ -104,9 +117,22 @@ void DeliveryRecord::requeue() const
void DeliveryRecord::release()
{
- if (acquired && !confirmed) {
+ if (acquired && !ended) {
queue->requeue(msg);
acquired = false;
+ setEnded();
+ }
+}
+
+void DeliveryRecord::complete()
+{
+ completed = true;
+}
+
+void DeliveryRecord::accept(TransactionContext* ctxt) {
+ if (acquired && !ended) {
+ queue->dequeue(ctxt, msg.payload);
+ setEnded();
}
}
@@ -124,9 +150,9 @@ void DeliveryRecord::reject()
}
}
-void DeliveryRecord::updateByteCredit(uint32_t& credit) const
+uint32_t DeliveryRecord::getCredit() const
{
- credit += msg.payload->getRequiredCredit();
+ return credit;
}
@@ -134,7 +160,7 @@ void DeliveryRecord::addTo(Prefetch& prefetch) const{
if(!pull){
//ignore 'pulled' messages (i.e. those that were sent in
//response to get) when calculating prefetch
- prefetch.size += msg.payload->contentSize();
+ prefetch.size += size;
prefetch.count++;
}
}
@@ -143,7 +169,7 @@ void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{
if(!pull){
//ignore 'pulled' messages (i.e. those that were sent in
//response to get) when calculating prefetch
- prefetch.size -= msg.payload->contentSize();
+ prefetch.size -= size;
prefetch.count--;
}
}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index eeb363bcfc..b2672345b4 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -47,32 +47,45 @@ class DeliveryRecord{
DeliveryToken::shared_ptr token;
DeliveryId id;
bool acquired;
- const bool confirmed;
const bool pull;
bool cancelled;
+ const uint32_t credit;
+ const uint64_t size;
+
+ bool completed;
+ bool ended;
+
+ void setEnded();
public:
DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token,
const DeliveryId id, bool acquired, bool confirmed = false);
DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id);
- void dequeue(TransactionContext* ctxt = 0) const;
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
bool coveredBy(const framing::AccumulatedAck* const range) const;
+
+ void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
void release();
void reject();
void cancel(const std::string& tag);
void redeliver(SemanticState* const);
- void updateByteCredit(uint32_t& credit) const;
+ void acquire(DeliveryIds& results);
+ void complete();
+ void accept(TransactionContext* ctxt);
+
+ bool isAcquired() const { return acquired; }
+ bool isComplete() const { return completed; }
+ bool isRedundant() const { return ended && completed; }
+
+ uint32_t getCredit() const;
void addTo(Prefetch&) const;
void subtractFrom(Prefetch&) const;
const std::string& getTag() const { return tag; }
bool isPull() const { return pull; }
- bool isAcquired() const { return acquired; }
- void acquire(DeliveryIds& results);
friend bool operator<(const DeliveryRecord&, const DeliveryRecord&);
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
index 676f9e4b3d..6c3d960d1f 100644
--- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
+++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
@@ -64,7 +64,7 @@ void MultiVersionConnectionInputHandler::idleIn()
bool MultiVersionConnectionInputHandler::doOutput()
{
- return check(false) && handler->doOutput();
+ return handler.get() && handler->doOutput();
}
qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation()
@@ -74,17 +74,14 @@ qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiat
void MultiVersionConnectionInputHandler::closed()
{
- check();
- handler->closed();
+ if (handler.get()) handler->closed();
+ //else closed before initiated, nothing to do
}
-bool MultiVersionConnectionInputHandler::check(bool fail)
+void MultiVersionConnectionInputHandler::check()
{
if (!handler.get()) {
- if (fail) throw qpid::framing::InternalErrorException("Handler not initialised!");
- else return false;
- } else {
- return true;
+ throw qpid::framing::InternalErrorException("Handler not initialised!");
}
}
diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
index 4301eba57c..440c00c09a 100644
--- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
+++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
@@ -38,7 +38,7 @@ class MultiVersionConnectionInputHandler : public qpid::sys::ConnectionInputHand
Broker& broker;
const std::string id;
- bool check(bool fail = true);
+ void check();
public:
MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 5851eeeafb..f372c60044 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -393,7 +393,7 @@ void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
++end;
}
- for_each(start, end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -433,20 +433,23 @@ void SemanticState::requestDispatch(ConsumerImpl& c)
}
}
-void SemanticState::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::complete(DeliveryRecord& delivery)
{
delivery.subtractFrom(outstanding);
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
- get_pointer(i)->adjustFlow(delivery);
+ get_pointer(i)->complete(delivery);
}
}
-void SemanticState::ConsumerImpl::adjustFlow(const DeliveryRecord& delivery)
+void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
{
- if (windowing) {
- if (msgCredit != 0xFFFFFFFF) msgCredit++;
- if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
+ if (!delivery.isComplete()) {
+ delivery.complete();
+ if (windowing) {
+ if (msgCredit != 0xFFFFFFFF) msgCredit++;
+ if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit();
+ }
}
}
@@ -662,15 +665,16 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
dtxBuffer->enlist(txAck);
}
} else {
- for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
- unacked.erase(range.start, range.end);
+ for_each(range.start, range.end, bind2nd(mem_fun_ref(&DeliveryRecord::accept), 0));
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
}
}
void SemanticState::completed(DeliveryId first, DeliveryId last)
{
AckRange range = findRange(first, last);
- for_each(range.start, range.end, boost::bind(&SemanticState::adjustFlow, this, _1));
+ for_each(range.start, range.end, boost::bind(&SemanticState::complete, this, _1));
+ unacked.remove_if(mem_fun_ref(&DeliveryRecord::isRedundant));
requestDispatch();
}
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 8648135cae..3d31d5a5a2 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -88,7 +88,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void addMessageCredit(uint32_t value);
void flush();
void stop();
- void adjustFlow(const DeliveryRecord&);
+ void complete(DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
bool isBlocked() const { return blocked; }
@@ -122,7 +122,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
- void adjustFlow(const DeliveryRecord&);
+ void complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
void requestDispatch(ConsumerImpl&);