diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 66 |
1 files changed, 33 insertions, 33 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index cfc379f47c..ba1f989f7c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -88,7 +88,7 @@ void SemanticState::closed() { //prevent requeued messages being redelivered to consumers for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { disable(i->second); - } + } if (dtxBuffer.get()) { dtxBuffer->fail(); } @@ -107,7 +107,7 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(const string& tag, +void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { @@ -116,7 +116,8 @@ void SemanticState::consume(const string& tag, consumers[tag] = c; } -void SemanticState::cancel(const string& tag){ +bool SemanticState::cancel(const string& tag) +{ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { cancel(i->second); @@ -124,7 +125,9 @@ void SemanticState::cancel(const string& tag){ //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); - + return true; + } else { + return false; } } @@ -194,7 +197,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail) dtxBuffer->fail(); } else { dtxBuffer->markEnded(); - } + } dtxBuffer.reset(); } @@ -254,9 +257,9 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + const string& _name, + Queue::shared_ptr _queue, bool ack, bool _acquire, bool _exclusive, @@ -265,20 +268,20 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const framing::FieldTable& _arguments -) : +) : Consumer(_acquire), - parent(_parent), - name(_name), - queue(_queue), - ackExpected(ack), + parent(_parent), + name(_name), + queue(_queue), + ackExpected(ack), acquire(_acquire), - blocked(true), + blocked(true), windowing(true), exclusive(_exclusive), resumeId(_resumeId), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), + msgCredit(0), byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), @@ -289,7 +292,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, { ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session)); - + if (agent != 0) { mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, @@ -331,7 +334,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); - } + } if (acquire && !ackExpected) { queue->dequeue(0, msg); } @@ -351,7 +354,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. - // + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -372,7 +375,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { assertClusterSafe(); uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; + uint32_t originalByteCredit = byteCredit; if (msgCredit != 0xFFFFFFFF) { msgCredit--; } @@ -382,7 +385,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); - + } bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) @@ -396,7 +399,7 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) return enoughCredit; } -SemanticState::ConsumerImpl::~ConsumerImpl() +SemanticState::ConsumerImpl::~ConsumerImpl() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -414,7 +417,7 @@ void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); - if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { + if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { Queue::tryAutoDelete(session.getBroker(), queue); } } @@ -457,7 +460,7 @@ const std::string nullstring; void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - + std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName) cacheExchange = session.getBroker().getExchanges().get(exchangeName); @@ -466,7 +469,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { /* verify the userid if specified: */ std::string id = msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; - + if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName))) { QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); @@ -484,7 +487,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (!strategy.delivered) { //TODO:if discard-unroutable, just drop it - //TODO:else if accept-mode is explicit, reject it + //TODO:else if accept-mode is explicit, reject it //else route it to alternate exchange if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -513,7 +516,7 @@ void SemanticState::ConsumerImpl::requestDispatch() } bool SemanticState::complete(DeliveryRecord& delivery) -{ +{ ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { i->second->complete(delivery); @@ -541,7 +544,7 @@ void SemanticState::recover(bool requeue) unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); + for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost @@ -673,7 +676,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { } AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ +{ return DeliveryRecord::findRange(unacked, first, last); } @@ -764,13 +767,13 @@ void SemanticState::accepted(const SequenceSet& commands) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: accumulatedAck.add(commands); - + if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); - dtxBuffer->enlist(txAck); + dtxBuffer->enlist(txAck); //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be @@ -792,7 +795,6 @@ void SemanticState::accepted(const SequenceSet& commands) { } void SemanticState::completed(const SequenceSet& commands) { - assertClusterSafe(); DeliveryRecords::iterator removed = remove_if(unacked.begin(), unacked.end(), isInSequenceSetAnd(commands, @@ -803,7 +805,6 @@ void SemanticState::completed(const SequenceSet& commands) { void SemanticState::attached() { - assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); session.getConnection().outputTasks.addOutputTask(i->second.get()); @@ -813,7 +814,6 @@ void SemanticState::attached() void SemanticState::detached() { - assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); session.getConnection().outputTasks.removeOutputTask(i->second.get()); |