diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 69 |
1 files changed, 35 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 73a0a5cf7b..cfc379f47c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -70,7 +70,7 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) deliveryAdapter(da), tagGenerator("sgen"), dtxSelected(false), - authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()), + authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), userID(getSession().getConnection().getUserId()), userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), @@ -88,7 +88,7 @@ void SemanticState::closed() { //prevent requeued messages being redelivered to consumers for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { disable(i->second); - } + } if (dtxBuffer.get()) { dtxBuffer->fail(); } @@ -107,7 +107,7 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(const string& tag, +void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { @@ -116,8 +116,7 @@ void SemanticState::consume(const string& tag, consumers[tag] = c; } -bool SemanticState::cancel(const string& tag) -{ +void SemanticState::cancel(const string& tag){ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { cancel(i->second); @@ -125,9 +124,7 @@ bool SemanticState::cancel(const string& tag) //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); - return true; - } else { - return false; + } } @@ -197,7 +194,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail) dtxBuffer->fail(); } else { dtxBuffer->markEnded(); - } + } dtxBuffer.reset(); } @@ -257,9 +254,9 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + const string& _name, + Queue::shared_ptr _queue, bool ack, bool _acquire, bool _exclusive, @@ -268,20 +265,20 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const framing::FieldTable& _arguments -) : +) : Consumer(_acquire), - parent(_parent), - name(_name), - queue(_queue), - ackExpected(ack), + parent(_parent), + name(_name), + queue(_queue), + ackExpected(ack), acquire(_acquire), - blocked(true), + blocked(true), windowing(true), exclusive(_exclusive), resumeId(_resumeId), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), + msgCredit(0), byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), @@ -292,7 +289,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, { ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session)); - + if (agent != 0) { mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, @@ -334,7 +331,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); - } + } if (acquire && !ackExpected) { queue->dequeue(0, msg); } @@ -354,7 +351,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. - // + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -375,7 +372,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { assertClusterSafe(); uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; + uint32_t originalByteCredit = byteCredit; if (msgCredit != 0xFFFFFFFF) { msgCredit--; } @@ -385,7 +382,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); - + } bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) @@ -399,7 +396,7 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) return enoughCredit; } -SemanticState::ConsumerImpl::~ConsumerImpl() +SemanticState::ConsumerImpl::~ConsumerImpl() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -417,7 +414,7 @@ void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); - if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { + if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { Queue::tryAutoDelete(session.getBroker(), queue); } } @@ -460,15 +457,16 @@ const std::string nullstring; void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - + std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) + if (!cacheExchange || cacheExchange->getName() != exchangeName) cacheExchange = session.getBroker().getExchanges().get(exchangeName); cacheExchange->setProperties(msg); /* verify the userid if specified: */ std::string id = msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; + if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName))) { QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); @@ -486,7 +484,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (!strategy.delivered) { //TODO:if discard-unroutable, just drop it - //TODO:else if accept-mode is explicit, reject it + //TODO:else if accept-mode is explicit, reject it //else route it to alternate exchange if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -515,7 +513,7 @@ void SemanticState::ConsumerImpl::requestDispatch() } bool SemanticState::complete(DeliveryRecord& delivery) -{ +{ ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { i->second->complete(delivery); @@ -543,7 +541,7 @@ void SemanticState::recover(bool requeue) unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); + for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost @@ -675,7 +673,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { } AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ +{ return DeliveryRecord::findRange(unacked, first, last); } @@ -766,13 +764,13 @@ void SemanticState::accepted(const SequenceSet& commands) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: accumulatedAck.add(commands); - + if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); - dtxBuffer->enlist(txAck); + dtxBuffer->enlist(txAck); //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be @@ -794,6 +792,7 @@ void SemanticState::accepted(const SequenceSet& commands) { } void SemanticState::completed(const SequenceSet& commands) { + assertClusterSafe(); DeliveryRecords::iterator removed = remove_if(unacked.begin(), unacked.end(), isInSequenceSetAnd(commands, @@ -804,6 +803,7 @@ void SemanticState::completed(const SequenceSet& commands) { void SemanticState::attached() { + assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); session.getConnection().outputTasks.addOutputTask(i->second.get()); @@ -813,6 +813,7 @@ void SemanticState::attached() void SemanticState::detached() { + assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); session.getConnection().outputTasks.removeOutputTask(i->second.get()); |