summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/DeliveryRecord.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/DeliveryRecord.cpp')
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp152
1 files changed, 64 insertions, 88 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 530dca99a4..22ec5e86a0 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -18,91 +18,72 @@
* under the License.
*
*/
-#include "DeliveryRecord.h"
-#include "DeliverableMessage.h"
-#include "SemanticState.h"
-#include "Exchange.h"
+#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/Exchange.h"
#include "qpid/log/Statement.h"
+#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/MessageTransferBody.h"
+using namespace qpid;
using namespace qpid::broker;
using std::string;
DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
- Queue::shared_ptr _queue,
- const std::string _tag,
- DeliveryToken::shared_ptr _token,
- const DeliveryId _id,
- bool _acquired, bool accepted) : msg(_msg),
- queue(_queue),
- tag(_tag),
- token(_token),
- id(_id),
- acquired(_acquired),
- pull(false),
- cancelled(false),
- credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
- size(msg.payload ? msg.payload->contentSize() : 0),
- completed(false),
- ended(accepted)
-{
- if (accepted) setEnded();
-}
-
-DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
- Queue::shared_ptr _queue,
- const DeliveryId _id) : msg(_msg),
- queue(_queue),
- id(_id),
- acquired(true),
- pull(true),
- cancelled(false),
- credit(msg.payload ? msg.payload->getRequiredCredit() : 0),
- size(msg.payload ? msg.payload->contentSize() : 0),
- completed(false),
- ended(false)
+ const Queue::shared_ptr& _queue,
+ const std::string& _tag,
+ bool _acquired,
+ bool accepted,
+ bool _windowing,
+ uint32_t _credit) : msg(_msg),
+ queue(_queue),
+ tag(_tag),
+ acquired(_acquired),
+ acceptExpected(!accepted),
+ cancelled(false),
+ completed(false),
+ ended(accepted && acquired),
+ windowing(_windowing),
+ credit(msg.payload ? msg.payload->getRequiredCredit() : _credit)
{}
-void DeliveryRecord::setEnded()
+bool DeliveryRecord::setEnded()
{
ended = true;
//reset msg pointer, don't need to hold on to it anymore
msg.payload = boost::intrusive_ptr<Message>();
-
QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id);
-}
-
-bool DeliveryRecord::matches(DeliveryId tag) const{
- return id == tag;
-}
-
-bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{
- return matches(tag) || after(tag);
-}
-
-bool DeliveryRecord::after(DeliveryId tag) const{
- return id > tag;
-}
-
-bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{
- return range->contains(id);
+ return isRedundant();
}
void DeliveryRecord::redeliver(SemanticState* const session) {
if (!ended) {
- if(pull || cancelled){
- //if message was originally sent as response to get, we must requeue it
-
- //or if subscription was cancelled, requeue it (waiting for
+ if(cancelled){
+ //if subscription was cancelled, requeue it (waiting for
//final confirmation for AMQP WG on this case)
-
requeue();
}else{
msg.payload->redeliver();//mark as redelivered
- id = session->redeliver(msg, token);
+ session->deliver(*this, false);
}
}
}
+void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize)
+{
+ id = deliveryId;
+ if (msg.payload->getRedelivered()){
+ msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true);
+ }
+
+ framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), tag, acceptExpected ? 0 : 1, acquired ? 0 : 1)));
+ method.setEof(false);
+ h.handle(method);
+ msg.payload->sendHeader(h, framesize);
+ msg.payload->sendContent(*queue, h, framesize);
+}
+
void DeliveryRecord::requeue() const
{
if (acquired && !ended) {
@@ -123,25 +104,29 @@ void DeliveryRecord::release(bool setRedelivered)
}
}
-void DeliveryRecord::complete()
-{
+void DeliveryRecord::complete() {
completed = true;
}
-void DeliveryRecord::accept(TransactionContext* ctxt) {
+bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
- queue->dequeue(ctxt, msg.payload);
+ queue->dequeue(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
+ return isRedundant();
}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
if (acquired && !ended) {
- queue->dequeue(ctxt, msg.payload);
+ queue->dequeue(ctxt, msg);
}
}
+void DeliveryRecord::committed() const{
+ queue->dequeueCommitted(msg);
+}
+
void DeliveryRecord::reject()
{
Exchange::shared_ptr alternate = queue->getAlternateExchange();
@@ -161,29 +146,14 @@ uint32_t DeliveryRecord::getCredit() const
return credit;
}
-
-void DeliveryRecord::addTo(Prefetch& prefetch) const{
- if(!pull){
- //ignore 'pulled' messages (i.e. those that were sent in
- //response to get) when calculating prefetch
- prefetch.size += size;
- prefetch.count++;
- }
-}
-
-void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{
- if(!pull){
- //ignore 'pulled' messages (i.e. those that were sent in
- //response to get) when calculating prefetch
- prefetch.size -= size;
- prefetch.count--;
- }
-}
-
void DeliveryRecord::acquire(DeliveryIds& results) {
if (queue->acquire(msg)) {
acquired = true;
results.push_back(id);
+ if (!acceptExpected) {
+ if (ended) { QPID_LOG(error, "Can't dequeue ended message"); }
+ else { queue->dequeue(0, msg); setEnded(); }
+ }
} else {
QPID_LOG(info, "Message already acquired " << id.getValue());
}
@@ -195,6 +165,16 @@ void DeliveryRecord::cancel(const std::string& cancelledTag)
cancelled = true;
}
+AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last)
+{
+ DeliveryRecords::iterator start = lower_bound(records.begin(), records.end(), first);
+ // Find end - position it just after the last record in range
+ DeliveryRecords::iterator end = lower_bound(records.begin(), records.end(), last);
+ if (end != records.end() && end->getId() == last) ++end;
+ return AckRange(start, end);
+}
+
+
namespace qpid {
namespace broker {
@@ -206,9 +186,5 @@ std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r)
return out;
}
-bool operator<(const DeliveryRecord& a, const DeliveryRecord& b)
-{
- return a.id < b.id;
-}
}}