summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp54
1 files changed, 27 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index bdd5f33601..87d7f6f97b 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -86,7 +86,7 @@ bool SemanticState::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-void SemanticState::consume(const string& tag,
+void SemanticState::consume(const string& tag,
Queue::shared_ptr queue, bool ackRequired, bool acquire,
bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments)
{
@@ -103,7 +103,7 @@ void SemanticState::cancel(const string& tag){
//should cancel all unacked messages for this consumer so that
//they are not redelivered on recovery
for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag));
-
+
}
}
@@ -173,7 +173,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail)
dtxBuffer->fail();
} else {
dtxBuffer->markEnded();
- }
+ }
dtxBuffer.reset();
}
@@ -233,9 +233,9 @@ void SemanticState::record(const DeliveryRecord& delivery)
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
- const string& _name,
- Queue::shared_ptr _queue,
+SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
+ const string& _name,
+ Queue::shared_ptr _queue,
bool ack,
bool _acquire,
bool _exclusive,
@@ -244,20 +244,20 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
const framing::FieldTable& _arguments
-) :
+) :
Consumer(_acquire),
- parent(_parent),
- name(_name),
- queue(_queue),
- ackExpected(ack),
+ parent(_parent),
+ name(_name),
+ queue(_queue),
+ ackExpected(ack),
acquire(_acquire),
- blocked(true),
+ blocked(true),
windowing(true),
exclusive(_exclusive),
resumeId(_resumeId),
resumeTtl(_resumeTtl),
arguments(_arguments),
- msgCredit(0),
+ msgCredit(0),
byteCredit(0),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
@@ -279,7 +279,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
if (!ackExpected) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
- }
+ }
if (acquire && !ackExpected) {
queue->dequeue(0, msg);
}
@@ -297,7 +297,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
// checkCredit fails because the message is to big, we should
// remain on queue's listener list for possible smaller messages
// in future.
- //
+ //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
@@ -305,7 +305,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
{
uint32_t originalMsgCredit = msgCredit;
- uint32_t originalByteCredit = byteCredit;
+ uint32_t originalByteCredit = byteCredit;
if (msgCredit != 0xFFFFFFFF) {
msgCredit--;
}
@@ -315,13 +315,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg)
QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent
<< ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
-
+
}
bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
+ QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent
<< ", bytes: " << byteCredit << " msgs: " << msgCredit);
return false;
} else {
@@ -341,7 +341,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
Queue::shared_ptr queue = c->getQueue();
if(queue) {
queue->cancel(c);
- if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
+ if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {
Queue::tryAutoDelete(session.getBroker(), queue);
}
}
@@ -366,7 +366,7 @@ const std::string nullstring;
void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
-
+
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName)
cacheExchange = session.getBroker().getExchanges().get(exchangeName);
@@ -393,7 +393,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
if (!strategy.delivered) {
//TODO:if discard-unroutable, just drop it
- //TODO:else if accept-mode is explicit, reject it
+ //TODO:else if accept-mode is explicit, reject it
//else route it to alternate exchange
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders());
@@ -402,7 +402,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
msg->destroy();
}
}
-
+ msg->releaseContent(); // release frames if release has been requested
}
void SemanticState::requestDispatch()
@@ -421,7 +421,7 @@ void SemanticState::ConsumerImpl::requestDispatch()
}
bool SemanticState::complete(DeliveryRecord& delivery)
-{
+{
ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
if (i != consumers.end()) {
i->second->complete(delivery);
@@ -449,7 +449,7 @@ void SemanticState::recover(bool requeue)
unacked.clear();
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
- for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
+ for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this));
//unconfirmed messages re redelivered and therefore have their
//id adjusted, confirmed messages are not and so the ordering
//w.r.t id is lost
@@ -570,7 +570,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const {
}
AckRange SemanticState::findRange(DeliveryId first, DeliveryId last)
-{
+{
return DeliveryRecord::findRange(unacked, first, last);
}
@@ -655,13 +655,13 @@ void SemanticState::accepted(const SequenceSet& commands) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
accumulatedAck.add(commands);
-
+
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
//unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
+ dtxBuffer->enlist(txAck);
//mark the relevant messages as 'ended' in unacked
//if the messages are already completed, they can be