diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 130 |
1 files changed, 49 insertions, 81 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 9a84db547c..5d96467bbf 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -29,7 +29,7 @@ #include "qpid/broker/SessionContext.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/TxAccept.h" -#include "qpid/broker/TxPublish.h" +#include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/SequenceSet.h" @@ -65,9 +65,8 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::broker; -SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) +SemanticState::SemanticState(SessionState& ss) : session(ss), - deliveryAdapter(da), tagGenerator("sgen"), dtxSelected(false), authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()), @@ -89,7 +88,7 @@ void SemanticState::closed() { if (dtxBuffer.get()) { dtxBuffer->fail(); } - recover(true); + requeue(); //now unsubscribe, which may trigger queue deletion and thus //needs to occur after the requeueing of unacked messages @@ -124,7 +123,7 @@ void SemanticState::consume(const string& tag, resumeId, resumeTtl, arguments); if (!c) // Create plain consumer c = ConsumerImpl::shared_ptr( - new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, + new ConsumerImpl(this, name, queue, ackRequired, acquire ? CONSUMER : BROWSER, exclusive, tag, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; @@ -281,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, bool ack, - bool _acquire, + SubscriptionType type, bool _exclusive, const string& _tag, const string& _resumeId, @@ -289,11 +288,11 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, const framing::FieldTable& _arguments ) : - Consumer(_name, _acquire), +Consumer(_name, type), parent(_parent), queue(_queue), ackExpected(ack), - acquire(_acquire), + acquire(type == CONSUMER), blocked(true), exclusive(_exclusive), resumeId(_resumeId), @@ -340,32 +339,42 @@ OwnershipToken* SemanticState::ConsumerImpl::getSession() return &(parent->session); } -bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) +bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg) +{ + return deliver(cursor, msg, shared_from_this()); +} +bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer) { assertClusterSafe(); - allocateCredit(msg.payload); - DeliveryRecord record(msg, msg.queue->shared_from_this(), getTag(), - shared_from_this(), acquire, !ackExpected, credit.isWindowMode(), 0); + allocateCredit(msg); + DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(), + consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg)); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset - parent->deliver(record, sync); + const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding()); + + record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(), + ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE, + acquire ? message::ACQUIRE_MODE_PRE_ACQUIRED : message::ACQUIRE_MODE_NOT_ACQUIRED, + msg.getAnnotations(), + sync)); if (credit.isWindowMode() || ackExpected || !acquire) { parent->record(record); } if (acquire && !ackExpected) { // auto acquire && auto accept - msg.queue->dequeue(0, msg); + queue->dequeue(0 /*ctxt*/, cursor); record.setEnded(); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; } -bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) +bool SemanticState::ConsumerImpl::filter(const Message&) { return true; } -bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) +bool SemanticState::ConsumerImpl::accept(const Message& msg) { assertClusterSafe(); // TODO aconway 2009-06-08: if we have byte & message credit but @@ -389,21 +398,21 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) { } } -void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) +void SemanticState::ConsumerImpl::allocateCredit(const Message& msg) { assertClusterSafe(); Credit original = credit; - credit.consume(1, msg->getRequiredCredit()); + credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg)); QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << original << " now " << credit); } -bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) +bool SemanticState::ConsumerImpl::checkCredit(const Message& msg) { - bool enoughCredit = credit.check(1, msg->getRequiredCredit()); + bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg)); QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient") - << " credit for message of " << msg->getRequiredCredit() << " bytes: " + << " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: " << credit); return enoughCredit; } @@ -421,7 +430,6 @@ void SemanticState::disable(ConsumerImpl::shared_ptr c) session.getConnection().outputTasks.removeOutputTask(c.get()); } - void SemanticState::cancel(ConsumerImpl::shared_ptr c) { disable(c); @@ -435,49 +443,20 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) c->cancel(); } -void SemanticState::handle(intrusive_ptr<Message> msg) { - if (txBuffer.get()) { - TxPublish* deliverable(new TxPublish(msg)); - TxOp::shared_ptr op(deliverable); - route(msg, *deliverable); - txBuffer->enlist(op); - } else { - DeliverableMessage deliverable(msg); - route(msg, deliverable); - if (msg->isContentReleaseRequested()) { - // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the - // presence of these messages). Do not change these without also checking these tests. - if (msg->isContentReleaseBlocked()) { - QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" << - std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked"); - } else { - msg->releaseContent(); - QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" << - std::hex << msg->getPersistenceId() << std::dec << ": Content released"); - } - } - } -} - -namespace +TxBuffer* SemanticState::getTxBuffer() { -const std::string nullstring; + return txBuffer.get(); } -void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { - msg->computeExpiration(getSession().getBroker().getExpiryPolicy()); +void SemanticState::route(Message& msg, Deliverable& strategy) { + msg.computeExpiration(getSession().getBroker().getExpiryPolicy()); - std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName - || cacheExchange->isDestroyed()) - { + std::string exchangeName = qpid::broker::amqp_0_10::MessageTransfer::get(msg).getExchangeName(); + if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) cacheExchange = session.getBroker().getExchanges().get(exchangeName); - } - cacheExchange->setProperties(msg); /* verify the userid if specified: */ - std::string id = - msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; + std::string id = msg.getUserId(); if (authMsg && !id.empty() && !session.getConnection().isAuthenticatedUser(id)) { QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); @@ -487,9 +466,9 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { AclModule* acl = getSession().getBroker().getAcl(); if (acl && acl->doTransferAcl()) { - if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() )) + if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg.getRoutingKey() )) throw UnauthorizedAccessException(QPID_MSG(userID << " cannot publish to " << - exchangeName << " with routing-key " << msg->getRoutingKey())); + exchangeName << " with routing-key " << msg.getRoutingKey())); } cacheExchange->route(strategy); @@ -501,9 +480,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy); } - if (!strategy.delivered) { - msg->destroy(); - } } } @@ -543,28 +519,20 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) } } -void SemanticState::recover(bool requeue) +void SemanticState::requeue() { - if(requeue){ - //take copy and clear unacked as requeue may result in redelivery to this session - //which will in turn result in additions to unacked - DeliveryRecords copy = unacked; - unacked.clear(); - for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); - }else{ - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); - //unconfirmed messages re redelivered and therefore have their - //id adjusted, confirmed messages are not and so the ordering - //w.r.t id is lost - sort(unacked.begin(), unacked.end()); - } + //take copy and clear unacked as requeue may result in redelivery to this session + //which will in turn result in additions to unacked + DeliveryRecords copy = unacked; + unacked.clear(); + for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); getSession().setUnackedCount(unacked.size()); } -void SemanticState::deliver(DeliveryRecord& msg, bool sync) -{ - return deliveryAdapter.deliver(msg, sync); -} + +SessionContext& SemanticState::getSession() { return session; } +const SessionContext& SemanticState::getSession() const { return session; } + const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const { |