diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 54 |
1 files changed, 27 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index bdd5f33601..87d7f6f97b 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -86,7 +86,7 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(const string& tag, +void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { @@ -103,7 +103,7 @@ void SemanticState::cancel(const string& tag){ //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); - + } } @@ -173,7 +173,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail) dtxBuffer->fail(); } else { dtxBuffer->markEnded(); - } + } dtxBuffer.reset(); } @@ -233,9 +233,9 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + const string& _name, + Queue::shared_ptr _queue, bool ack, bool _acquire, bool _exclusive, @@ -244,20 +244,20 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const framing::FieldTable& _arguments -) : +) : Consumer(_acquire), - parent(_parent), - name(_name), - queue(_queue), - ackExpected(ack), + parent(_parent), + name(_name), + queue(_queue), + ackExpected(ack), acquire(_acquire), - blocked(true), + blocked(true), windowing(true), exclusive(_exclusive), resumeId(_resumeId), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), + msgCredit(0), byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), @@ -279,7 +279,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) if (!ackExpected) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); - } + } if (acquire && !ackExpected) { queue->dequeue(0, msg); } @@ -297,7 +297,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. - // + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -305,7 +305,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; + uint32_t originalByteCredit = byteCredit; if (msgCredit != 0xFFFFFFFF) { msgCredit--; } @@ -315,13 +315,13 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) QPID_LOG(debug, "Credit allocated for '" << name << "' on " << parent << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); - + } bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) { if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { - QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent + QPID_LOG(debug, "Not enough credit for '" << name << "' on " << parent << ", bytes: " << byteCredit << " msgs: " << msgCredit); return false; } else { @@ -341,7 +341,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); - if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { + if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { Queue::tryAutoDelete(session.getBroker(), queue); } } @@ -366,7 +366,7 @@ const std::string nullstring; void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - + std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName) cacheExchange = session.getBroker().getExchanges().get(exchangeName); @@ -393,7 +393,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (!strategy.delivered) { //TODO:if discard-unroutable, just drop it - //TODO:else if accept-mode is explicit, reject it + //TODO:else if accept-mode is explicit, reject it //else route it to alternate exchange if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -402,7 +402,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->destroy(); } } - + msg->releaseContent(); // release frames if release has been requested } void SemanticState::requestDispatch() @@ -421,7 +421,7 @@ void SemanticState::ConsumerImpl::requestDispatch() } bool SemanticState::complete(DeliveryRecord& delivery) -{ +{ ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { i->second->complete(delivery); @@ -449,7 +449,7 @@ void SemanticState::recover(bool requeue) unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); + for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost @@ -570,7 +570,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { } AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ +{ return DeliveryRecord::findRange(unacked, first, last); } @@ -655,13 +655,13 @@ void SemanticState::accepted(const SequenceSet& commands) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: accumulatedAck.add(commands); - + if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); - dtxBuffer->enlist(txAck); + dtxBuffer->enlist(txAck); //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be |